You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/06/30 12:48:05 UTC

[41/51] [partial] cassandra git commit: Storage engine refactor, a.k.a CASSANDRA-8099

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/Conflicts.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Conflicts.java b/src/java/org/apache/cassandra/db/Conflicts.java
new file mode 100644
index 0000000..fa0e819
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/Conflicts.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.db.context.CounterContext;
+
+public abstract class Conflicts
+{
+    private Conflicts() {}
+
+    public enum Resolution { LEFT_WINS, MERGE, RIGHT_WINS };
+
+    public static Resolution resolveRegular(long leftTimestamp,
+                                            boolean leftLive,
+                                            int leftLocalDeletionTime,
+                                            ByteBuffer leftValue,
+                                            long rightTimestamp,
+                                            boolean rightLive,
+                                            int rightLocalDeletionTime,
+                                            ByteBuffer rightValue)
+    {
+        if (leftTimestamp != rightTimestamp)
+            return leftTimestamp < rightTimestamp ? Resolution.RIGHT_WINS : Resolution.LEFT_WINS;
+
+        if (leftLive != rightLive)
+            return leftLive ? Resolution.RIGHT_WINS : Resolution.LEFT_WINS;
+
+        int c = leftValue.compareTo(rightValue);
+        if (c < 0)
+            return Resolution.RIGHT_WINS;
+        else if (c > 0)
+            return Resolution.LEFT_WINS;
+
+        // Prefer the longest ttl if relevant
+        return leftLocalDeletionTime < rightLocalDeletionTime ? Resolution.RIGHT_WINS : Resolution.LEFT_WINS;
+    }
+
+    public static Resolution resolveCounter(long leftTimestamp,
+                                            boolean leftLive,
+                                            ByteBuffer leftValue,
+                                            long rightTimestamp,
+                                            boolean rightLive,
+                                            ByteBuffer rightValue)
+    {
+        // No matter what the counter cell's timestamp is, a tombstone always takes precedence. See CASSANDRA-7346.
+        if (!leftLive)
+            // left is a tombstone: it has precedence over right if either right is not a tombstone, or left has a greater timestamp
+            return rightLive || leftTimestamp > rightTimestamp ? Resolution.LEFT_WINS : Resolution.RIGHT_WINS;
+
+        // If right is a tombstone, since left isn't one, it has precedence
+        if (!rightLive)
+            return Resolution.RIGHT_WINS;
+
+        return Resolution.MERGE;
+    }
+
+    public static ByteBuffer mergeCounterValues(ByteBuffer left, ByteBuffer right)
+    {
+        return CounterContext.instance().merge(left, right);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/CounterCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterCell.java b/src/java/org/apache/cassandra/db/CounterCell.java
deleted file mode 100644
index cda1200..0000000
--- a/src/java/org/apache/cassandra/db/CounterCell.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.context.CounterContext;
-import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.apache.cassandra.utils.memory.AbstractAllocator;
-import org.apache.cassandra.utils.memory.MemtableAllocator;
-
-/**
- * A column that represents a partitioned counter.
- */
-public interface CounterCell extends Cell
-{
-    static final CounterContext contextManager = CounterContext.instance();
-
-    public long timestampOfLastDelete();
-
-    public long total();
-
-    public boolean hasLegacyShards();
-
-    public Cell markLocalToBeCleared();
-
-    CounterCell localCopy(CFMetaData metadata, AbstractAllocator allocator);
-
-    CounterCell localCopy(CFMetaData metaData, MemtableAllocator allocator, OpOrder.Group opGroup);
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/CounterMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java b/src/java/org/apache/cassandra/db/CounterMutation.java
index 58717b4..f87c66c 100644
--- a/src/java/org/apache/cassandra/db/CounterMutation.java
+++ b/src/java/org/apache/cassandra/db/CounterMutation.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.db;
 
 import java.io.DataInput;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
@@ -27,12 +26,15 @@ import java.util.concurrent.locks.Lock;
 import com.google.common.base.Function;
 import com.google.common.base.Objects;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
 import com.google.common.util.concurrent.Striped;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.context.CounterContext;
-import org.apache.cassandra.db.filter.NamesQueryFilter;
 import org.apache.cassandra.exceptions.WriteTimeoutException;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataOutputPlus;
@@ -41,6 +43,7 @@ import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.concurrent.OpOrder;
 
 public class CounterMutation implements IMutation
 {
@@ -67,9 +70,9 @@ public class CounterMutation implements IMutation
         return mutation.getColumnFamilyIds();
     }
 
-    public Collection<ColumnFamily> getColumnFamilies()
+    public Collection<PartitionUpdate> getPartitionUpdates()
     {
-        return mutation.getColumnFamilies();
+        return mutation.getPartitionUpdates();
     }
 
     public Mutation getMutation()
@@ -77,7 +80,7 @@ public class CounterMutation implements IMutation
         return mutation;
     }
 
-    public ByteBuffer key()
+    public DecoratedKey key()
     {
         return mutation.key();
     }
@@ -111,19 +114,14 @@ public class CounterMutation implements IMutation
         Mutation result = new Mutation(getKeyspaceName(), key());
         Keyspace keyspace = Keyspace.open(getKeyspaceName());
 
-        int count = 0;
-        for (ColumnFamily cf : getColumnFamilies())
-            count += cf.getColumnCount();
-
-        List<Lock> locks = new ArrayList<>(count);
-        Tracing.trace("Acquiring {} counter locks", count);
+        List<Lock> locks = new ArrayList<>();
+        Tracing.trace("Acquiring counter locks");
         try
         {
             grabCounterLocks(keyspace, locks);
-            for (ColumnFamily cf : getColumnFamilies())
-                result.add(processModifications(cf));
+            for (PartitionUpdate upd : getPartitionUpdates())
+                result.add(processModifications(upd));
             result.apply();
-            updateCounterCache(result, keyspace);
             return result;
         }
         finally
@@ -160,141 +158,144 @@ public class CounterMutation implements IMutation
      */
     private Iterable<Object> getCounterLockKeys()
     {
-        return Iterables.concat(Iterables.transform(getColumnFamilies(), new Function<ColumnFamily, Iterable<Object>>()
+        return Iterables.concat(Iterables.transform(getPartitionUpdates(), new Function<PartitionUpdate, Iterable<Object>>()
         {
-            public Iterable<Object> apply(final ColumnFamily cf)
+            public Iterable<Object> apply(final PartitionUpdate update)
             {
-                return Iterables.transform(cf, new Function<Cell, Object>()
+                return Iterables.concat(Iterables.transform(update, new Function<Row, Iterable<Object>>()
                 {
-                    public Object apply(Cell cell)
+                    public Iterable<Object> apply(final Row row)
                     {
-                        return Objects.hashCode(cf.id(), key(), cell.name());
+                        return Iterables.concat(Iterables.transform(row, new Function<Cell, Object>()
+                        {
+                            public Object apply(final Cell cell)
+                            {
+                                return Objects.hashCode(update.metadata().cfId, key(), row.clustering(), cell.column(), cell.path());
+                            }
+                        }));
                     }
-                });
+                }));
             }
         }));
     }
 
-    // Replaces all the CounterUpdateCell-s with updated regular CounterCell-s
-    private ColumnFamily processModifications(ColumnFamily changesCF)
+    private PartitionUpdate processModifications(PartitionUpdate changes)
     {
-        ColumnFamilyStore cfs = Keyspace.open(getKeyspaceName()).getColumnFamilyStore(changesCF.id());
+        ColumnFamilyStore cfs = Keyspace.open(getKeyspaceName()).getColumnFamilyStore(changes.metadata().cfId);
 
-        ColumnFamily resultCF = changesCF.cloneMeShallow();
+        List<PartitionUpdate.CounterMark> marks = changes.collectCounterMarks();
 
-        List<CounterUpdateCell> counterUpdateCells = new ArrayList<>(changesCF.getColumnCount());
-        for (Cell cell : changesCF)
+        if (CacheService.instance.counterCache.getCapacity() != 0)
         {
-            if (cell instanceof CounterUpdateCell)
-                counterUpdateCells.add((CounterUpdateCell)cell);
-            else
-                resultCF.addColumn(cell);
+            Tracing.trace("Fetching {} counter values from cache", marks.size());
+            updateWithCurrentValuesFromCache(marks, cfs);
+            if (marks.isEmpty())
+                return changes;
         }
 
-        if (counterUpdateCells.isEmpty())
-            return resultCF; // only DELETEs
-
-        ClockAndCount[] currentValues = getCurrentValues(counterUpdateCells, cfs);
-        for (int i = 0; i < counterUpdateCells.size(); i++)
-        {
-            ClockAndCount currentValue = currentValues[i];
-            CounterUpdateCell update = counterUpdateCells.get(i);
+        Tracing.trace("Reading {} counter values from the CF", marks.size());
+        updateWithCurrentValuesFromCFS(marks, cfs);
 
-            long clock = currentValue.clock + 1L;
-            long count = currentValue.count + update.delta();
+        // What's remain is new counters
+        for (PartitionUpdate.CounterMark mark : marks)
+            updateWithCurrentValue(mark, ClockAndCount.BLANK, cfs);
 
-            resultCF.addColumn(new BufferCounterCell(update.name(),
-                                                     CounterContext.instance().createGlobal(CounterId.getLocalId(), clock, count),
-                                                     update.timestamp()));
-        }
-
-        return resultCF;
+        return changes;
     }
 
-    // Attempt to load the current values(s) from cache. If that fails, read the rest from the cfs.
-    private ClockAndCount[] getCurrentValues(List<CounterUpdateCell> counterUpdateCells, ColumnFamilyStore cfs)
+    private void updateWithCurrentValue(PartitionUpdate.CounterMark mark, ClockAndCount currentValue, ColumnFamilyStore cfs)
     {
-        ClockAndCount[] currentValues = new ClockAndCount[counterUpdateCells.size()];
-        int remaining = counterUpdateCells.size();
+        long clock = currentValue.clock + 1L;
+        long count = currentValue.count + CounterContext.instance().total(mark.value());
 
-        if (CacheService.instance.counterCache.getCapacity() != 0)
-        {
-            Tracing.trace("Fetching {} counter values from cache", counterUpdateCells.size());
-            remaining = getCurrentValuesFromCache(counterUpdateCells, cfs, currentValues);
-            if (remaining == 0)
-                return currentValues;
-        }
-
-        Tracing.trace("Reading {} counter values from the CF", remaining);
-        getCurrentValuesFromCFS(counterUpdateCells, cfs, currentValues);
+        mark.setValue(CounterContext.instance().createGlobal(CounterId.getLocalId(), clock, count));
 
-        return currentValues;
+        // Cache the newly updated value
+        cfs.putCachedCounter(key().getKey(), mark.clustering(), mark.column(), mark.path(), ClockAndCount.create(clock, count));
     }
 
     // Returns the count of cache misses.
-    private int getCurrentValuesFromCache(List<CounterUpdateCell> counterUpdateCells,
-                                          ColumnFamilyStore cfs,
-                                          ClockAndCount[] currentValues)
+    private void updateWithCurrentValuesFromCache(List<PartitionUpdate.CounterMark> marks, ColumnFamilyStore cfs)
     {
-        int cacheMisses = 0;
-        for (int i = 0; i < counterUpdateCells.size(); i++)
+        Iterator<PartitionUpdate.CounterMark> iter = marks.iterator();
+        while (iter.hasNext())
         {
-            ClockAndCount cached = cfs.getCachedCounter(key(), counterUpdateCells.get(i).name());
+            PartitionUpdate.CounterMark mark = iter.next();
+            ClockAndCount cached = cfs.getCachedCounter(key().getKey(), mark.clustering(), mark.column(), mark.path());
             if (cached != null)
-                currentValues[i] = cached;
-            else
-                cacheMisses++;
+            {
+                updateWithCurrentValue(mark, cached, cfs);
+                iter.remove();
+            }
         }
-        return cacheMisses;
     }
 
     // Reads the missing current values from the CFS.
-    private void getCurrentValuesFromCFS(List<CounterUpdateCell> counterUpdateCells,
-                                         ColumnFamilyStore cfs,
-                                         ClockAndCount[] currentValues)
+    private void updateWithCurrentValuesFromCFS(List<PartitionUpdate.CounterMark> marks, ColumnFamilyStore cfs)
     {
-        SortedSet<CellName> names = new TreeSet<>(cfs.metadata.comparator);
-        for (int i = 0; i < currentValues.length; i++)
-            if (currentValues[i] == null)
-                names.add(counterUpdateCells.get(i).name());
-
-        ReadCommand cmd = new SliceByNamesReadCommand(getKeyspaceName(), key(), cfs.metadata.cfName, Long.MIN_VALUE, new NamesQueryFilter(names));
-        Row row = cmd.getRow(cfs.keyspace);
-        ColumnFamily cf = row == null ? null : row.cf;
+        ColumnFilter.Builder builder = ColumnFilter.selectionBuilder();
+        NavigableSet<Clustering> names = new TreeSet<>(cfs.metadata.comparator);
+        for (PartitionUpdate.CounterMark mark : marks)
+        {
+            names.add(mark.clustering().takeAlias());
+            if (mark.path() == null)
+                builder.add(mark.column());
+            else
+                builder.select(mark.column(), mark.path());
+        }
 
-        for (int i = 0; i < currentValues.length; i++)
+        int nowInSec = FBUtilities.nowInSeconds();
+        ClusteringIndexNamesFilter filter = new ClusteringIndexNamesFilter(names, false);
+        SinglePartitionReadCommand cmd = SinglePartitionReadCommand.create(cfs.metadata, nowInSec, key(), builder.build(), filter);
+        PeekingIterator<PartitionUpdate.CounterMark> markIter = Iterators.peekingIterator(marks.iterator());
+        try (OpOrder.Group op = cfs.readOrdering.start(); RowIterator partition = UnfilteredRowIterators.filter(cmd.queryMemtableAndDisk(cfs, op), nowInSec))
         {
-            if (currentValues[i] != null)
-                continue;
+            updateForRow(markIter, partition.staticRow(), cfs);
 
-            Cell cell = cf == null ? null : cf.getColumn(counterUpdateCells.get(i).name());
-            if (cell == null || !cell.isLive()) // absent or a tombstone.
-                currentValues[i] = ClockAndCount.BLANK;
-            else
-                currentValues[i] = CounterContext.instance().getLocalClockAndCount(cell.value());
+            while (partition.hasNext())
+            {
+                if (!markIter.hasNext())
+                    return;
+
+                updateForRow(markIter, partition.next(), cfs);
+            }
         }
     }
 
-    private void updateCounterCache(Mutation applied, Keyspace keyspace)
+    private int compare(Clustering c1, Clustering c2, ColumnFamilyStore cfs)
     {
-        if (CacheService.instance.counterCache.getCapacity() == 0)
-            return;
+        if (c1 == Clustering.STATIC_CLUSTERING)
+            return c2 == Clustering.STATIC_CLUSTERING ? 0 : -1;
+        if (c2 == Clustering.STATIC_CLUSTERING)
+            return 1;
 
-        for (ColumnFamily cf : applied.getColumnFamilies())
-        {
-            ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cf.id());
-            for (Cell cell : cf)
-                if (cell instanceof CounterCell)
-                    cfs.putCachedCounter(key(), cell.name(), CounterContext.instance().getLocalClockAndCount(cell.value()));
-        }
+        return cfs.getComparator().compare(c1, c2);
     }
 
-    public void addAll(IMutation m)
+    private void updateForRow(PeekingIterator<PartitionUpdate.CounterMark> markIter, Row row, ColumnFamilyStore cfs)
     {
-        if (!(m instanceof CounterMutation))
-            throw new IllegalArgumentException();
-        CounterMutation cm = (CounterMutation)m;
-        mutation.addAll(cm.mutation);
+        int cmp = 0;
+        // If the mark is before the row, we have no value for this mark, just consume it
+        while (markIter.hasNext() && (cmp = compare(markIter.peek().clustering(), row.clustering(), cfs)) < 0)
+            markIter.next();
+
+        if (!markIter.hasNext())
+            return;
+
+        while (cmp == 0)
+        {
+            PartitionUpdate.CounterMark mark = markIter.next();
+            Cell cell = mark.path() == null ? row.getCell(mark.column()) : row.getCell(mark.column(), mark.path());
+            if (cell != null)
+            {
+                updateWithCurrentValue(mark, CounterContext.instance().getLocalClockAndCount(cell.value()), cfs);
+                markIter.remove();
+            }
+            if (!markIter.hasNext())
+                return;
+
+            cmp = compare(markIter.peek().clustering(), row.clustering(), cfs);
+        }
     }
 
     public long getTimeout()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/CounterUpdateCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterUpdateCell.java b/src/java/org/apache/cassandra/db/CounterUpdateCell.java
deleted file mode 100644
index 58ac365..0000000
--- a/src/java/org/apache/cassandra/db/CounterUpdateCell.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-/**
- * A counter update while it hasn't been applied yet by the leader replica.
- *
- * Contains a single counter update. When applied by the leader replica, this
- * is transformed to a relevant CounterCell. This Cell is a temporary data
- * structure that should never be stored inside a memtable or an sstable.
- */
-public interface CounterUpdateCell extends Cell
-{
-    public long delta();
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/DataRange.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataRange.java b/src/java/org/apache/cassandra/db/DataRange.java
index 1e6f8c8..909d6ed 100644
--- a/src/java/org/apache/cassandra/db/DataRange.java
+++ b/src/java/org/apache/cassandra/db/DataRange.java
@@ -6,7 +6,6 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -17,302 +16,391 @@
  */
 package org.apache.cassandra.db;
 
+import java.io.DataInput;
+import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
 
-import com.google.common.base.Objects;
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.composites.Composites;
+import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.dht.*;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessagingService;
 
 /**
- * Groups key range and column filter for range queries.
- *
- * The main "trick" of this class is that the column filter can only
- * be obtained by providing the row key on which the column filter will
- * be applied (which we always know before actually querying the columns).
- *
- * This allows the paging DataRange to return a filter for most rows but a
- * potentially different ones for the starting and stopping key. Could
- * allow more fancy stuff in the future too, like column filters that
- * depend on the actual key value :)
+ * Groups both the range of partitions to query, and the clustering index filter to
+ * apply for each partition (for a (partition) range query).
+ * <p>
+ * The main "trick" is that the clustering index filter can only be obtained by
+ * providing the partition key on which the filter will be applied. This is
+ * necessary when paging range queries, as we might need a different filter
+ * for the starting key than for other keys (because the previous page we had
+ * queried may have ended in the middle of a partition).
  */
 public class DataRange
 {
-    protected final AbstractBounds<RowPosition> keyRange;
-    protected IDiskAtomFilter columnFilter;
-    protected final boolean selectFullRow;
+    public static final Serializer serializer = new Serializer();
 
-    public DataRange(AbstractBounds<RowPosition> range, IDiskAtomFilter columnFilter)
+    private final AbstractBounds<PartitionPosition> keyRange;
+    protected final ClusteringIndexFilter clusteringIndexFilter;
+
+    /**
+     * Creates a {@code DataRange} given a range of partition keys and a clustering index filter. The
+     * return {@code DataRange} will return the same filter for all keys.
+     *
+     * @param range the range over partition keys to use.
+     * @param clusteringIndexFilter the clustering index filter to use.
+     */
+    public DataRange(AbstractBounds<PartitionPosition> range, ClusteringIndexFilter clusteringIndexFilter)
     {
         this.keyRange = range;
-        this.columnFilter = columnFilter;
-        this.selectFullRow = columnFilter instanceof SliceQueryFilter
-                           ? isFullRowSlice((SliceQueryFilter)columnFilter)
-                           : false;
+        this.clusteringIndexFilter = clusteringIndexFilter;
     }
 
-    public static boolean isFullRowSlice(SliceQueryFilter filter)
+    /**
+     * Creates a {@code DataRange} to query all data (over the whole ring).
+     *
+     * @param partitioner the partitioner in use for the table.
+     *
+     * @return the newly create {@code DataRange}.
+     */
+    public static DataRange allData(IPartitioner partitioner)
     {
-        return filter.slices.length == 1
-            && filter.start().isEmpty()
-            && filter.finish().isEmpty()
-            && filter.count == Integer.MAX_VALUE;
+        return forTokenRange(new Range<Token>(partitioner.getMinimumToken(), partitioner.getMinimumToken()));
     }
 
-    public static DataRange allData(IPartitioner partitioner)
+    /**
+     * Creates a {@code DataRange} to query all rows over the provided token range.
+     *
+     * @param tokenRange the (partition key) token range to query.
+     *
+     * @return the newly create {@code DataRange}.
+     */
+    public static DataRange forTokenRange(Range<Token> tokenRange)
     {
-        return forTokenRange(new Range<Token>(partitioner.getMinimumToken(), partitioner.getMinimumToken()));
+        return forKeyRange(Range.makeRowRange(tokenRange));
     }
 
-    public static DataRange forTokenRange(Range<Token> keyRange)
+    /**
+     * Creates a {@code DataRange} to query all rows over the provided key range.
+     *
+     * @param keyRange the (partition key) range to query.
+     *
+     * @return the newly create {@code DataRange}.
+     */
+    public static DataRange forKeyRange(Range<PartitionPosition> keyRange)
     {
-        return forKeyRange(Range.makeRowRange(keyRange));
+        return new DataRange(keyRange, new ClusteringIndexSliceFilter(Slices.ALL, false));
     }
 
-    public static DataRange forKeyRange(Range<RowPosition> keyRange)
+    /**
+     * Creates a {@code DataRange} to query all partitions of the ring using the provided
+     * clustering index filter.
+     *
+     * @param partitioner the partitioner in use for the table queried.
+     * @param filter the clustering index filter to use.
+     *
+     * @return the newly create {@code DataRange}.
+     */
+    public static DataRange allData(IPartitioner partitioner, ClusteringIndexFilter filter)
     {
-        return new DataRange(keyRange, new IdentityQueryFilter());
+        return new DataRange(Range.makeRowRange(new Range<Token>(partitioner.getMinimumToken(), partitioner.getMinimumToken())), filter);
     }
 
-    public AbstractBounds<RowPosition> keyRange()
+    /**
+     * The range of partition key queried by this {@code DataRange}.
+     *
+     * @return the range of partition key queried by this {@code DataRange}.
+     */
+    public AbstractBounds<PartitionPosition> keyRange()
     {
         return keyRange;
     }
 
-    public RowPosition startKey()
+    /**
+     * The start of the partition key range queried by this {@code DataRange}.
+     *
+     * @return the start of the partition key range queried by this {@code DataRange}.
+     */
+    public PartitionPosition startKey()
     {
         return keyRange.left;
     }
 
-    public RowPosition stopKey()
+    /**
+     * The end of the partition key range queried by this {@code DataRange}.
+     *
+     * @return the end of the partition key range queried by this {@code DataRange}.
+     */
+    public PartitionPosition stopKey()
     {
         return keyRange.right;
     }
 
     /**
-     * Returns true if tombstoned partitions should not be included in results or count towards the limit.
-     * See CASSANDRA-8490 for more details on why this is needed (and done this way).
-     * */
-    public boolean ignoredTombstonedPartitions()
+     * Whether the underlying clustering index filter is a names filter or not.
+     *
+     * @return Whether the underlying clustering index filter is a names filter or not.
+     */
+    public boolean isNamesQuery()
     {
-        if (!(columnFilter instanceof SliceQueryFilter))
-            return false;
-
-        return ((SliceQueryFilter) columnFilter).compositesToGroup == SliceQueryFilter.IGNORE_TOMBSTONED_PARTITIONS;
+        return clusteringIndexFilter instanceof ClusteringIndexNamesFilter;
     }
 
-    // Whether the bounds of this DataRange actually wraps around.
+    /**
+     * Whether the range queried by this {@code DataRange} actually wraps around.
+     *
+     * @return whether the range queried by this {@code DataRange} actually wraps around.
+     */
     public boolean isWrapAround()
     {
-        // On range can ever wrap
+        // Only range can ever wrap
         return keyRange instanceof Range && ((Range<?>)keyRange).isWrapAround();
     }
 
-    public boolean contains(RowPosition pos)
+    /**
+     * Whether the provided ring position is covered by this {@code DataRange}.
+     *
+     * @return whether the provided ring position is covered by this {@code DataRange}.
+     */
+    public boolean contains(PartitionPosition pos)
     {
         return keyRange.contains(pos);
     }
 
-    public int getLiveCount(ColumnFamily data, long now)
+    /**
+     * Whether this {@code DataRange} queries everything (has no restriction neither on the
+     * partition queried, nor within the queried partition).
+     *
+     * @return Whether this {@code DataRange} queries everything.
+     */
+    public boolean isUnrestricted()
     {
-        return columnFilter instanceof SliceQueryFilter
-             ? ((SliceQueryFilter)columnFilter).lastCounted()
-             : columnFilter.getLiveCount(data, now);
+        return startKey().isMinimum() && stopKey().isMinimum() && clusteringIndexFilter.selectsAllPartition();
     }
 
-    public boolean selectsFullRowFor(ByteBuffer rowKey)
+    /**
+     * The clustering index filter to use for the provided key.
+     * <p>
+     * This may or may not be the same filter for all keys (that is, paging range
+     * use a different filter for their start key).
+     *
+     * @param key the partition key for which we want the clustering index filter.
+     *
+     * @return the clustering filter to use for {@code key}.
+     */
+    public ClusteringIndexFilter clusteringIndexFilter(DecoratedKey key)
     {
-        return selectFullRow;
+        return clusteringIndexFilter;
     }
 
     /**
-     * Returns a column filter that should be used for a particular row key.  Note that in the case of paging,
-     * slice starts and ends may change depending on the row key.
+     * Returns a new {@code DataRange} for use when paging {@code this} range.
+     *
+     * @param range the range of partition keys to query.
+     * @param comparator the comparator for the table queried.
+     * @param lastReturned the clustering for the last result returned by the previous page, i.e. the result we want to start our new page
+     * from. This last returned must <b>must</b> correspond to left bound of {@code range} (in other words, {@code range.left} must be the
+     * partition key for that {@code lastReturned} result).
+     * @param inclusive whether or not we want to include the {@code lastReturned} in the newly returned page of results.
+     *
+     * @return a new {@code DataRange} suitable for paging {@code this} range given the {@code lastRetuned} result of the previous page.
      */
-    public IDiskAtomFilter columnFilter(ByteBuffer rowKey)
+    public DataRange forPaging(AbstractBounds<PartitionPosition> range, ClusteringComparator comparator, Clustering lastReturned, boolean inclusive)
     {
-        return columnFilter;
+        return new Paging(range, clusteringIndexFilter, comparator, lastReturned, inclusive);
     }
 
     /**
-     * Sets a new limit on the number of (grouped) cells to fetch. This is currently only used when the query limit applies
-     * to CQL3 rows.
+     * Returns a new {@code DataRange} equivalent to {@code this} one but restricted to the provided sub-range.
+     *
+     * @param range the sub-range to use for the newly returned data range. Note that assumes that {@code range} is a proper
+     * sub-range of the initial range but doesn't validate it. You should make sure to only provided sub-ranges however or this
+     * might throw off the paging case (see Paging.forSubRange()).
+     *
+     * @return a new {@code DataRange} using {@code range} as partition key range and the clustering index filter filter from {@code this}.
      */
-    public void updateColumnsLimit(int count)
+    public DataRange forSubRange(AbstractBounds<PartitionPosition> range)
     {
-        columnFilter.updateColumnsLimit(count);
+        return new DataRange(range, clusteringIndexFilter);
     }
 
-    public static class Paging extends DataRange
+    public String toString(CFMetaData metadata)
     {
-        // The slice of columns that we want to fetch for each row, ignoring page start/end issues.
-        private final SliceQueryFilter sliceFilter;
+        return String.format("range=%s pfilter=%s", keyRange.getString(metadata.getKeyValidator()), clusteringIndexFilter.toString(metadata));
+    }
 
-        private final CFMetaData cfm;
+    public String toCQLString(CFMetaData metadata)
+    {
+        if (isUnrestricted())
+            return "UNRESTRICTED";
 
-        private final Comparator<Composite> comparator;
+        StringBuilder sb = new StringBuilder();
 
-        // used to restrict the start of the slice for the first partition in the range
-        private final Composite firstPartitionColumnStart;
+        boolean needAnd = false;
+        if (!startKey().isMinimum())
+        {
+            appendClause(startKey(), sb, metadata, true, keyRange.isStartInclusive());
+            needAnd = true;
+        }
+        if (!stopKey().isMinimum())
+        {
+            if (needAnd)
+                sb.append(" AND ");
+            appendClause(stopKey(), sb, metadata, false, keyRange.isEndInclusive());
+            needAnd = true;
+        }
 
-        // used to restrict the end of the slice for the last partition in the range
-        private final Composite lastPartitionColumnFinish;
+        String filterString = clusteringIndexFilter.toCQLString(metadata);
+        if (!filterString.isEmpty())
+            sb.append(needAnd ? " AND " : "").append(filterString);
 
-        // tracks the last key that we updated the filter for to avoid duplicating work
-        private ByteBuffer lastKeyFilterWasUpdatedFor;
+        return sb.toString();
+    }
 
-        private Paging(AbstractBounds<RowPosition> range, SliceQueryFilter filter, Composite firstPartitionColumnStart,
-                       Composite lastPartitionColumnFinish, CFMetaData cfm, Comparator<Composite> comparator)
+    private void appendClause(PartitionPosition pos, StringBuilder sb, CFMetaData metadata, boolean isStart, boolean isInclusive)
+    {
+        sb.append("token(");
+        sb.append(ColumnDefinition.toCQLString(metadata.partitionKeyColumns()));
+        sb.append(") ").append(getOperator(isStart, isInclusive)).append(" ");
+        if (pos instanceof DecoratedKey)
+        {
+            sb.append("token(");
+            appendKeyString(sb, metadata.getKeyValidator(), ((DecoratedKey)pos).getKey());
+            sb.append(")");
+        }
+        else
+        {
+            sb.append(((Token.KeyBound)pos).getToken());
+        }
+    }
+
+    private static String getOperator(boolean isStart, boolean isInclusive)
+    {
+        return isStart
+             ? (isInclusive ? ">=" : ">")
+             : (isInclusive ? "<=" : "<");
+    }
+
+    // TODO: this is reused in SinglePartitionReadCommand but this should not really be here. Ideally
+    // we need a more "native" handling of composite partition keys.
+    public static void appendKeyString(StringBuilder sb, AbstractType<?> type, ByteBuffer key)
+    {
+        if (type instanceof CompositeType)
+        {
+            CompositeType ct = (CompositeType)type;
+            ByteBuffer[] values = ct.split(key);
+            for (int i = 0; i < ct.types.size(); i++)
+                sb.append(i == 0 ? "" : ", ").append(ct.types.get(i).getString(values[i]));
+        }
+        else
+        {
+            sb.append(type.getString(key));
+        }
+    }
+
+    /**
+     * Specialized {@code DataRange} used for the paging case.
+     * <p>
+     * It uses the clustering of the last result of the previous page to restrict the filter on the
+     * first queried partition (the one for that last result) so it only fetch results that follow that
+     * last result. In other words, this makes sure this resume paging where we left off.
+     */
+    private static class Paging extends DataRange
+    {
+        private final ClusteringComparator comparator;
+        private final Clustering lastReturned;
+        private final boolean inclusive;
+
+        private Paging(AbstractBounds<PartitionPosition> range,
+                       ClusteringIndexFilter filter,
+                       ClusteringComparator comparator,
+                       Clustering lastReturned,
+                       boolean inclusive)
         {
             super(range, filter);
 
             // When using a paging range, we don't allow wrapped ranges, as it's unclear how to handle them properly.
-            // This is ok for now since we only need this in range slice queries, and the range are "unwrapped" in that case.
+            // This is ok for now since we only need this in range queries, and the range are "unwrapped" in that case.
             assert !(range instanceof Range) || !((Range<?>)range).isWrapAround() || range.right.isMinimum() : range;
+            assert lastReturned != null;
 
-            this.sliceFilter = filter;
-            this.cfm = cfm;
             this.comparator = comparator;
-            this.firstPartitionColumnStart = firstPartitionColumnStart;
-            this.lastPartitionColumnFinish = lastPartitionColumnFinish;
-            this.lastKeyFilterWasUpdatedFor = null;
+            this.lastReturned = lastReturned;
+            this.inclusive = inclusive;
         }
 
-        public Paging(AbstractBounds<RowPosition> range, SliceQueryFilter filter, Composite columnStart, Composite columnFinish, CFMetaData cfm)
+        @Override
+        public ClusteringIndexFilter clusteringIndexFilter(DecoratedKey key)
         {
-            this(range, filter, columnStart, columnFinish, cfm, filter.isReversed() ? cfm.comparator.reverseComparator() : cfm.comparator);
+            return key.equals(startKey())
+                 ? clusteringIndexFilter.forPaging(comparator, lastReturned, inclusive)
+                 : clusteringIndexFilter;
         }
 
         @Override
-        public boolean selectsFullRowFor(ByteBuffer rowKey)
+        public DataRange forSubRange(AbstractBounds<PartitionPosition> range)
         {
-            // If we initial filter is not the full filter, don't bother
-            if (!selectFullRow)
-                return false;
-
-            if (!equals(startKey(), rowKey) && !equals(stopKey(), rowKey))
-                return true;
+            // This is called for subrange of the initial range. So either it's the beginning of the initial range,
+            // and we need to preserver lastReturned, or it's not, and we don't care about it anymore.
+            return range.left.equals(keyRange().left)
+                 ? new Paging(range, clusteringIndexFilter, comparator, lastReturned, inclusive)
+                 : new DataRange(range, clusteringIndexFilter);
+        }
 
-            return isFullRowSlice((SliceQueryFilter)columnFilter(rowKey));
+        @Override
+        public boolean isUnrestricted()
+        {
+            return false;
         }
+    }
 
-        private boolean equals(RowPosition pos, ByteBuffer rowKey)
+    public static class Serializer
+    {
+        public void serialize(DataRange range, DataOutputPlus out, int version, CFMetaData metadata) throws IOException
         {
-            return pos instanceof DecoratedKey && ((DecoratedKey)pos).getKey().equals(rowKey);
+            AbstractBounds.rowPositionSerializer.serialize(range.keyRange, out, version);
+            ClusteringIndexFilter.serializer.serialize(range.clusteringIndexFilter, out, version);
+            boolean isPaging = range instanceof Paging;
+            out.writeBoolean(isPaging);
+            if (isPaging)
+            {
+                Clustering.serializer.serialize(((Paging)range).lastReturned, out, version, metadata.comparator.subtypes());
+                out.writeBoolean(((Paging)range).inclusive);
+            }
         }
 
-        @Override
-        public IDiskAtomFilter columnFilter(ByteBuffer rowKey)
+        public DataRange deserialize(DataInput in, int version, CFMetaData metadata) throws IOException
         {
-            /*
-             * We have that ugly hack that for slice queries, when we ask for
-             * the live count, we reach into the query filter to get the last
-             * counter number of columns to avoid recounting.
-             * Maybe we should just remove that hack, but in the meantime, we
-             * need to keep a reference the last returned filter.
-             */
-            if (equals(startKey(), rowKey) || equals(stopKey(), rowKey))
+            AbstractBounds<PartitionPosition> range = AbstractBounds.rowPositionSerializer.deserialize(in, MessagingService.globalPartitioner(), version);
+            ClusteringIndexFilter filter = ClusteringIndexFilter.serializer.deserialize(in, version, metadata);
+            if (in.readBoolean())
             {
-                if (!rowKey.equals(lastKeyFilterWasUpdatedFor))
-                {
-                    this.lastKeyFilterWasUpdatedFor = rowKey;
-                    columnFilter = sliceFilter.withUpdatedSlices(slicesForKey(rowKey));
-                }
+                ClusteringComparator comparator = metadata.comparator;
+                Clustering lastReturned = Clustering.serializer.deserialize(in, version, comparator.subtypes());
+                boolean inclusive = in.readBoolean();
+                return new Paging(range, filter, comparator, lastReturned, inclusive);
             }
             else
             {
-                columnFilter = sliceFilter;
+                return new DataRange(range, filter);
             }
-
-            return columnFilter;
         }
 
-        /** Returns true if the slice includes static columns, false otherwise. */
-        private boolean sliceIncludesStatics(ColumnSlice slice, boolean reversed, CFMetaData cfm)
+        public long serializedSize(DataRange range, int version, CFMetaData metadata)
         {
-            return cfm.hasStaticColumns() &&
-                   slice.includes(reversed ? cfm.comparator.reverseComparator() : cfm.comparator, cfm.comparator.staticPrefix().end());
-        }
+            long size = AbstractBounds.rowPositionSerializer.serializedSize(range.keyRange, version)
+                      + ClusteringIndexFilter.serializer.serializedSize(range.clusteringIndexFilter, version)
+                      + 1; // isPaging boolean
 
-        private ColumnSlice[] slicesForKey(ByteBuffer key)
-        {
-            // Also note that firstPartitionColumnStart and lastPartitionColumnFinish, when used, only "restrict" the filter slices,
-            // it doesn't expand on them. As such, we can ignore the case where they are empty and we do
-            // as it screw up with the logic below (see #6592)
-            Composite newStart = equals(startKey(), key) && !firstPartitionColumnStart.isEmpty() ? firstPartitionColumnStart : null;
-            Composite newFinish = equals(stopKey(), key) && !lastPartitionColumnFinish.isEmpty() ? lastPartitionColumnFinish : null;
-
-            // in the common case, we'll have the same number of slices
-            List<ColumnSlice> newSlices = new ArrayList<>(sliceFilter.slices.length);
-
-            // Check our slices to see if any fall before the page start (in which case they can be removed) or
-            // if they contain the page start (in which case they should start from the page start).  However, if the
-            // slices would include static columns, we need to ensure they are also fetched, and so a separate
-            // slice for the static columns may be required.
-            // Note that if the query is reversed, we can't handle statics by simply adding a separate slice here, so
-            // the reversed case is handled by SliceFromReadCommand instead. See CASSANDRA-8502 for more details.
-            for (ColumnSlice slice : sliceFilter.slices)
+            if (range instanceof Paging)
             {
-                if (newStart != null)
-                {
-                    if (slice.isBefore(comparator, newStart))
-                    {
-                        if (!sliceFilter.reversed && sliceIncludesStatics(slice, false, cfm))
-                            newSlices.add(new ColumnSlice(Composites.EMPTY, cfm.comparator.staticPrefix().end()));
-
-                        continue;
-                    }
-
-                    if (slice.includes(comparator, newStart))
-                    {
-                        if (!sliceFilter.reversed && sliceIncludesStatics(slice, false, cfm) && !newStart.equals(Composites.EMPTY))
-                            newSlices.add(new ColumnSlice(Composites.EMPTY, cfm.comparator.staticPrefix().end()));
-
-                        slice = new ColumnSlice(newStart, slice.finish);
-                    }
-
-                    // once we see a slice that either includes the page start or is after it, we can stop checking
-                    // against the page start (because the slices are ordered)
-                    newStart = null;
-                }
-
-                assert newStart == null;
-                if (newFinish != null && !slice.isBefore(comparator, newFinish))
-                {
-                    if (slice.includes(comparator, newFinish))
-                        newSlices.add(new ColumnSlice(slice.start, newFinish));
-                    // In any case, we're done
-                    break;
-                }
-                newSlices.add(slice);
+                size += Clustering.serializer.serializedSize(((Paging)range).lastReturned, version, metadata.comparator.subtypes(), TypeSizes.NATIVE);
+                size += 1; // inclusive boolean
             }
-
-            return newSlices.toArray(new ColumnSlice[newSlices.size()]);
-        }
-
-        @Override
-        public void updateColumnsLimit(int count)
-        {
-            columnFilter.updateColumnsLimit(count);
-            sliceFilter.updateColumnsLimit(count);
-        }
-
-        @Override
-        public String toString()
-        {
-            return Objects.toStringHelper(this)
-                          .add("keyRange", keyRange)
-                          .add("sliceFilter", sliceFilter)
-                          .add("columnFilter", columnFilter)
-                          .add("firstPartitionColumnStart", firstPartitionColumnStart == null ? "null" : cfm.comparator.getString(firstPartitionColumnStart))
-                          .add("lastPartitionColumnFinish", lastPartitionColumnFinish == null ? "null" : cfm.comparator.getString(lastPartitionColumnFinish))
-                          .toString();
+            return size;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/DecoratedKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DecoratedKey.java b/src/java/org/apache/cassandra/db/DecoratedKey.java
index cc62a15..92d6414 100644
--- a/src/java/org/apache/cassandra/db/DecoratedKey.java
+++ b/src/java/org/apache/cassandra/db/DecoratedKey.java
@@ -36,7 +36,7 @@ import org.apache.cassandra.utils.IFilter.FilterKey;
  * if this matters, you can subclass RP to use a stronger hash, or use a non-lossy tokenization scheme (as in the
  * OrderPreservingPartitioner classes).
  */
-public abstract class DecoratedKey implements RowPosition, FilterKey
+public abstract class DecoratedKey implements PartitionPosition, FilterKey
 {
     public static final Comparator<DecoratedKey> comparator = new Comparator<DecoratedKey>()
     {
@@ -72,7 +72,7 @@ public abstract class DecoratedKey implements RowPosition, FilterKey
         return ByteBufferUtil.compareUnsigned(getKey(), other.getKey()) == 0; // we compare faster than BB.equals for array backed BB
     }
 
-    public int compareTo(RowPosition pos)
+    public int compareTo(PartitionPosition pos)
     {
         if (this == pos)
             return 0;
@@ -86,7 +86,7 @@ public abstract class DecoratedKey implements RowPosition, FilterKey
         return cmp == 0 ? ByteBufferUtil.compareUnsigned(getKey(), otherKey.getKey()) : cmp;
     }
 
-    public static int compareTo(IPartitioner partitioner, ByteBuffer key, RowPosition position)
+    public static int compareTo(IPartitioner partitioner, ByteBuffer key, PartitionPosition position)
     {
         // delegate to Token.KeyBound if needed
         if (!(position instanceof DecoratedKey))
@@ -113,9 +113,9 @@ public abstract class DecoratedKey implements RowPosition, FilterKey
         return false;
     }
 
-    public RowPosition.Kind kind()
+    public PartitionPosition.Kind kind()
     {
-        return RowPosition.Kind.ROW_KEY;
+        return PartitionPosition.Kind.ROW_KEY;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/DeletedCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletedCell.java b/src/java/org/apache/cassandra/db/DeletedCell.java
deleted file mode 100644
index 998c409..0000000
--- a/src/java/org/apache/cassandra/db/DeletedCell.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.apache.cassandra.utils.memory.AbstractAllocator;
-import org.apache.cassandra.utils.memory.MemtableAllocator;
-
-public interface DeletedCell extends Cell
-{
-    DeletedCell localCopy(CFMetaData metadata, AbstractAllocator allocator);
-
-    DeletedCell localCopy(CFMetaData metaData, MemtableAllocator allocator, OpOrder.Group opGroup);
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/DeletionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionInfo.java b/src/java/org/apache/cassandra/db/DeletionInfo.java
index 048324a..e54d6b1 100644
--- a/src/java/org/apache/cassandra/db/DeletionInfo.java
+++ b/src/java/org/apache/cassandra/db/DeletionInfo.java
@@ -17,40 +17,32 @@
  */
 package org.apache.cassandra.db;
 
-import java.io.DataInput;
-import java.io.IOException;
-import java.security.MessageDigest;
-import java.util.Comparator;
 import java.util.Iterator;
 
 import com.google.common.base.Objects;
 import com.google.common.collect.Iterators;
 
 import org.apache.cassandra.cache.IMeasurableMemory;
-import org.apache.cassandra.db.composites.CType;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.db.rows.Cell;
 import org.apache.cassandra.utils.ObjectSizes;
 import org.apache.cassandra.utils.memory.AbstractAllocator;
 
 /**
- * A combination of a top-level (or row) tombstone and range tombstones describing the deletions
- * within a {@link ColumnFamily} (or row).
+ * A combination of a top-level (partition) tombstone and range tombstones describing the deletions
+ * within a partition.
  */
 public class DeletionInfo implements IMeasurableMemory
 {
     private static final long EMPTY_SIZE = ObjectSizes.measure(new DeletionInfo(0, 0));
 
     /**
-     * This represents a deletion of the entire row.  We can't represent this within the RangeTombstoneList, so it's
-     * kept separately.  This also slightly optimizes the common case of a full row deletion.
+     * This represents a deletion of the entire partition. We can't represent this within the RangeTombstoneList, so it's
+     * kept separately. This also slightly optimizes the common case of a full partition deletion.
      */
-    private DeletionTime topLevel;
+    private DeletionTime partitionDeletion;
 
     /**
-     * A list of range tombstones within the row.  This is left as null if there are no range tombstones
+     * A list of range tombstones within the partition.  This is left as null if there are no range tombstones
      * (to save an allocation (since it's a common case).
      */
     private RangeTombstoneList ranges;
@@ -65,28 +57,23 @@ public class DeletionInfo implements IMeasurableMemory
     {
         // Pre-1.1 node may return MIN_VALUE for non-deleted container, but the new default is MAX_VALUE
         // (see CASSANDRA-3872)
-        this(new DeletionTime(markedForDeleteAt, localDeletionTime == Integer.MIN_VALUE ? Integer.MAX_VALUE : localDeletionTime));
+        this(new SimpleDeletionTime(markedForDeleteAt, localDeletionTime == Integer.MIN_VALUE ? Integer.MAX_VALUE : localDeletionTime));
     }
 
-    public DeletionInfo(DeletionTime topLevel)
+    public DeletionInfo(DeletionTime partitionDeletion)
     {
-        this(topLevel, null);
+        this(partitionDeletion, null);
     }
 
-    public DeletionInfo(Composite start, Composite end, Comparator<Composite> comparator, long markedForDeleteAt, int localDeletionTime)
+    public DeletionInfo(ClusteringComparator comparator, Slice slice, long markedForDeleteAt, int localDeletionTime)
     {
         this(DeletionTime.LIVE, new RangeTombstoneList(comparator, 1));
-        ranges.add(start, end, markedForDeleteAt, localDeletionTime);
+        ranges.add(slice.start(), slice.end(), markedForDeleteAt, localDeletionTime);
     }
 
-    public DeletionInfo(RangeTombstone rangeTombstone, Comparator<Composite> comparator)
+    public DeletionInfo(DeletionTime partitionDeletion, RangeTombstoneList ranges)
     {
-        this(rangeTombstone.min, rangeTombstone.max, comparator, rangeTombstone.data.markedForDeleteAt, rangeTombstone.data.localDeletionTime);
-    }
-
-    private DeletionInfo(DeletionTime topLevel, RangeTombstoneList ranges)
-    {
-        this.topLevel = topLevel;
+        this.partitionDeletion = partitionDeletion.takeAlias();
         this.ranges = ranges;
     }
 
@@ -100,17 +87,16 @@ public class DeletionInfo implements IMeasurableMemory
 
     public DeletionInfo copy()
     {
-        return new DeletionInfo(topLevel, ranges == null ? null : ranges.copy());
+        return new DeletionInfo(partitionDeletion, ranges == null ? null : ranges.copy());
     }
 
     public DeletionInfo copy(AbstractAllocator allocator)
     {
-
         RangeTombstoneList rangesCopy = null;
         if (ranges != null)
              rangesCopy = ranges.copy(allocator);
 
-        return new DeletionInfo(topLevel, rangesCopy);
+        return new DeletionInfo(partitionDeletion, rangesCopy);
     }
 
     /**
@@ -118,106 +104,31 @@ public class DeletionInfo implements IMeasurableMemory
      */
     public boolean isLive()
     {
-        return topLevel.isLive() && (ranges == null || ranges.isEmpty());
+        return partitionDeletion.isLive() && (ranges == null || ranges.isEmpty());
     }
 
     /**
-     * Return whether a given cell is deleted by the container having this deletion info.
+     * Return whether a given cell is deleted by this deletion info.
      *
+     * @param clustering the clustering for the cell to check.
      * @param cell the cell to check.
      * @return true if the cell is deleted, false otherwise
      */
-    public boolean isDeleted(Cell cell)
+    private boolean isDeleted(Clustering clustering, Cell cell)
     {
-        // We do rely on this test: if topLevel.markedForDeleteAt is MIN_VALUE, we should not
-        // consider the column deleted even if timestamp=MIN_VALUE, otherwise this break QueryFilter.isRelevant
+        // If we're live, don't consider anything deleted, even if the cell ends up having as timestamp Long.MIN_VALUE
+        // (which shouldn't happen in practice, but it would invalid to consider it deleted if it does).
         if (isLive())
             return false;
 
-        if (cell.timestamp() <= topLevel.markedForDeleteAt)
+        if (cell.livenessInfo().timestamp() <= partitionDeletion.markedForDeleteAt())
             return true;
 
         // No matter what the counter cell's timestamp is, a tombstone always takes precedence. See CASSANDRA-7346.
-        if (!topLevel.isLive() && cell instanceof CounterCell)
-            return true;
-
-        return ranges != null && ranges.isDeleted(cell);
-    }
-
-    /**
-     * Returns a new {@link InOrderTester} in forward order.
-     */
-    public InOrderTester inOrderTester()
-    {
-        return inOrderTester(false);
-    }
-
-    /**
-     * Returns a new {@link InOrderTester} given the order in which
-     * columns will be passed to it.
-     */
-    public InOrderTester inOrderTester(boolean reversed)
-    {
-        return new InOrderTester(reversed);
-    }
-
-    /**
-     * Purge every tombstones that are older than {@code gcbefore}.
-     *
-     * @param gcBefore timestamp (in seconds) before which tombstones should be purged
-     */
-    public void purge(int gcBefore)
-    {
-        topLevel = topLevel.localDeletionTime < gcBefore ? DeletionTime.LIVE : topLevel;
-
-        if (ranges != null)
-        {
-            ranges.purge(gcBefore);
-            if (ranges.isEmpty())
-                ranges = null;
-        }
-    }
-
-    /**
-     * Evaluates difference between this deletion info and superset for read repair
-     *
-     * @return the difference between the two, or LIVE if no difference
-     */
-    public DeletionInfo diff(DeletionInfo superset)
-    {
-        RangeTombstoneList rangeDiff = superset.ranges == null || superset.ranges.isEmpty()
-                                     ? null
-                                     : ranges == null ? superset.ranges : ranges.diff(superset.ranges);
-
-        return topLevel.markedForDeleteAt != superset.topLevel.markedForDeleteAt || rangeDiff != null
-             ? new DeletionInfo(superset.topLevel, rangeDiff)
-             : DeletionInfo.live();
-    }
-
-
-    /**
-     * Digests deletion info. Used to trigger read repair on mismatch.
-     */
-    public void updateDigest(MessageDigest digest)
-    {
-        if (topLevel.markedForDeleteAt != Long.MIN_VALUE)
-            digest.update(ByteBufferUtil.bytes(topLevel.markedForDeleteAt));
-
-        if (ranges != null)
-            ranges.updateDigest(digest);
-    }
-
-    /**
-     * Returns true if {@code purge} would remove the top-level tombstone or any of the range
-     * tombstones, false otherwise.
-     * @param gcBefore timestamp (in seconds) before which tombstones should be purged
-     */
-    public boolean hasPurgeableTombstones(int gcBefore)
-    {
-        if (topLevel.localDeletionTime < gcBefore)
+        if (!partitionDeletion.isLive() && cell.isCounterCell())
             return true;
 
-        return ranges != null && ranges.hasPurgeableTombstones(gcBefore);
+        return ranges != null && ranges.isDeleted(clustering, cell);
     }
 
     /**
@@ -227,11 +138,11 @@ public class DeletionInfo implements IMeasurableMemory
      */
     public void add(DeletionTime newInfo)
     {
-        if (topLevel.markedForDeleteAt < newInfo.markedForDeleteAt)
-            topLevel = newInfo;
+        if (newInfo.supersedes(partitionDeletion))
+            partitionDeletion = newInfo;
     }
 
-    public void add(RangeTombstone tombstone, Comparator<Composite> comparator)
+    public void add(RangeTombstone tombstone, ClusteringComparator comparator)
     {
         if (ranges == null)
             ranges = new RangeTombstoneList(comparator, 1);
@@ -248,7 +159,7 @@ public class DeletionInfo implements IMeasurableMemory
      */
     public DeletionInfo add(DeletionInfo newInfo)
     {
-        add(newInfo.topLevel);
+        add(newInfo.partitionDeletion);
 
         if (ranges == null)
             ranges = newInfo.ranges == null ? null : newInfo.ranges.copy();
@@ -258,53 +169,30 @@ public class DeletionInfo implements IMeasurableMemory
         return this;
     }
 
-    /**
-     * Returns the minimum timestamp in any of the range tombstones or the top-level tombstone.
-     */
-    public long minTimestamp()
-    {
-        return ranges == null
-             ? topLevel.markedForDeleteAt
-             : Math.min(topLevel.markedForDeleteAt, ranges.minMarkedAt());
-    }
-
-    /**
-     * Returns the maximum timestamp in any of the range tombstones or the top-level tombstone.
-     */
-    public long maxTimestamp()
-    {
-        return ranges == null
-             ? topLevel.markedForDeleteAt
-             : Math.max(topLevel.markedForDeleteAt, ranges.maxMarkedAt());
-    }
-
-    /**
-     * Returns the top-level (or "row") tombstone.
-     */
-    public DeletionTime getTopLevelDeletion()
+    public DeletionTime getPartitionDeletion()
     {
-        return topLevel;
+        return partitionDeletion;
     }
 
     // Use sparingly, not the most efficient thing
-    public Iterator<RangeTombstone> rangeIterator()
+    public Iterator<RangeTombstone> rangeIterator(boolean reversed)
     {
-        return ranges == null ? Iterators.<RangeTombstone>emptyIterator() : ranges.iterator();
+        return ranges == null ? Iterators.<RangeTombstone>emptyIterator() : ranges.iterator(reversed);
     }
 
-    public Iterator<RangeTombstone> rangeIterator(Composite start, Composite finish)
+    public Iterator<RangeTombstone> rangeIterator(Slice slice, boolean reversed)
     {
-        return ranges == null ? Iterators.<RangeTombstone>emptyIterator() : ranges.iterator(start, finish);
+        return ranges == null ? Iterators.<RangeTombstone>emptyIterator() : ranges.iterator(slice, reversed);
     }
 
-    public RangeTombstone rangeCovering(Composite name)
+    public RangeTombstone rangeCovering(Clustering name)
     {
         return ranges == null ? null : ranges.search(name);
     }
 
     public int dataSize()
     {
-        int size = TypeSizes.NATIVE.sizeof(topLevel.markedForDeleteAt);
+        int size = TypeSizes.NATIVE.sizeof(partitionDeletion.markedForDeleteAt());
         return size + (ranges == null ? 0 : ranges.dataSize());
     }
 
@@ -323,45 +211,43 @@ public class DeletionInfo implements IMeasurableMemory
      */
     public boolean mayModify(DeletionInfo delInfo)
     {
-        return topLevel.compareTo(delInfo.topLevel) > 0 || hasRanges();
+        return partitionDeletion.compareTo(delInfo.partitionDeletion) > 0 || hasRanges();
     }
 
     @Override
     public String toString()
     {
         if (ranges == null || ranges.isEmpty())
-            return String.format("{%s}", topLevel);
+            return String.format("{%s}", partitionDeletion);
         else
-            return String.format("{%s, ranges=%s}", topLevel, rangesAsString());
+            return String.format("{%s, ranges=%s}", partitionDeletion, rangesAsString());
     }
 
     private String rangesAsString()
     {
         assert !ranges.isEmpty();
         StringBuilder sb = new StringBuilder();
-        CType type = (CType)ranges.comparator();
-        assert type != null;
-        Iterator<RangeTombstone> iter = rangeIterator();
+        ClusteringComparator cc = ranges.comparator();
+        Iterator<RangeTombstone> iter = rangeIterator(false);
         while (iter.hasNext())
         {
             RangeTombstone i = iter.next();
-            sb.append("[");
-            sb.append(type.getString(i.min)).append("-");
-            sb.append(type.getString(i.max)).append(", ");
-            sb.append(i.data);
-            sb.append("]");
+            sb.append(i.deletedSlice().toString(cc));
+            sb.append("@");
+            sb.append(i.deletionTime());
         }
         return sb.toString();
     }
 
     // Updates all the timestamp of the deletion contained in this DeletionInfo to be {@code timestamp}.
-    public void updateAllTimestamp(long timestamp)
+    public DeletionInfo updateAllTimestamp(long timestamp)
     {
-        if (topLevel.markedForDeleteAt != Long.MIN_VALUE)
-            topLevel = new DeletionTime(timestamp, topLevel.localDeletionTime);
+        if (partitionDeletion.markedForDeleteAt() != Long.MIN_VALUE)
+            partitionDeletion = new SimpleDeletionTime(timestamp, partitionDeletion.localDeletionTime());
 
         if (ranges != null)
             ranges.updateAllTimestamp(timestamp);
+        return this;
     }
 
     @Override
@@ -370,100 +256,18 @@ public class DeletionInfo implements IMeasurableMemory
         if(!(o instanceof DeletionInfo))
             return false;
         DeletionInfo that = (DeletionInfo)o;
-        return topLevel.equals(that.topLevel) && Objects.equal(ranges, that.ranges);
+        return partitionDeletion.equals(that.partitionDeletion) && Objects.equal(ranges, that.ranges);
     }
 
     @Override
     public final int hashCode()
     {
-        return Objects.hashCode(topLevel, ranges);
+        return Objects.hashCode(partitionDeletion, ranges);
     }
 
     @Override
     public long unsharedHeapSize()
     {
-        return EMPTY_SIZE + topLevel.unsharedHeapSize() + (ranges == null ? 0 : ranges.unsharedHeapSize());
-    }
-
-    public static class Serializer implements IVersionedSerializer<DeletionInfo>
-    {
-        private final RangeTombstoneList.Serializer rtlSerializer;
-
-        public Serializer(CType type)
-        {
-            this.rtlSerializer = new RangeTombstoneList.Serializer(type);
-        }
-
-        public void serialize(DeletionInfo info, DataOutputPlus out, int version) throws IOException
-        {
-            DeletionTime.serializer.serialize(info.topLevel, out);
-            rtlSerializer.serialize(info.ranges, out, version);
-        }
-
-        public DeletionInfo deserialize(DataInput in, int version) throws IOException
-        {
-            DeletionTime topLevel = DeletionTime.serializer.deserialize(in);
-            RangeTombstoneList ranges = rtlSerializer.deserialize(in, version);
-            return new DeletionInfo(topLevel, ranges);
-        }
-
-        public long serializedSize(DeletionInfo info, TypeSizes typeSizes, int version)
-        {
-            long size = DeletionTime.serializer.serializedSize(info.topLevel, typeSizes);
-            return size + rtlSerializer.serializedSize(info.ranges, typeSizes, version);
-        }
-
-        public long serializedSize(DeletionInfo info, int version)
-        {
-            return serializedSize(info, TypeSizes.NATIVE, version);
-        }
-    }
-
-    /**
-     * This object allow testing whether a given column (name/timestamp) is deleted
-     * or not by this DeletionInfo, assuming that the columns given to this
-     * object are passed in forward or reversed comparator sorted order.
-     *
-     * This is more efficient that calling DeletionInfo.isDeleted() repeatedly
-     * in that case.
-     */
-    public class InOrderTester
-    {
-        /*
-         * Note that because because range tombstone are added to this DeletionInfo while we iterate,
-         * `ranges` may be null initially and we need to wait for the first range to create the tester (once
-         * created the test will pick up new tombstones however). We are guaranteed that a range tombstone
-         * will be added *before* we test any column that it may delete, so this is ok.
-         */
-        private RangeTombstoneList.InOrderTester tester;
-        private final boolean reversed;
-
-        private InOrderTester(boolean reversed)
-        {
-            this.reversed = reversed;
-        }
-
-        public boolean isDeleted(Cell cell)
-        {
-            if (cell.timestamp() <= topLevel.markedForDeleteAt)
-                return true;
-
-            // No matter what the counter cell's timestamp is, a tombstone always takes precedence. See CASSANDRA-7346.
-            if (!topLevel.isLive() && cell instanceof CounterCell)
-                return true;
-
-            /*
-             * We don't optimize the reversed case for now because RangeTombstoneList
-             * is always in forward sorted order.
-             */
-            if (reversed)
-                 return DeletionInfo.this.isDeleted(cell);
-
-            // Maybe create the tester if we hadn't yet and we now have some ranges (see above).
-            if (tester == null && ranges != null)
-                tester = ranges.inOrderTester();
-
-            return tester != null && tester.isDeleted(cell);
-        }
+        return EMPTY_SIZE + partitionDeletion.unsharedHeapSize() + (ranges == null ? 0 : ranges.unsharedHeapSize());
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/DeletionTime.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionTime.java b/src/java/org/apache/cassandra/db/DeletionTime.java
index 7165417..f070778 100644
--- a/src/java/org/apache/cassandra/db/DeletionTime.java
+++ b/src/java/org/apache/cassandra/db/DeletionTime.java
@@ -19,58 +19,56 @@ package org.apache.cassandra.db;
 
 import java.io.DataInput;
 import java.io.IOException;
+import java.security.MessageDigest;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
 
 import org.apache.cassandra.cache.IMeasurableMemory;
 import org.apache.cassandra.io.ISerializer;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.ObjectSizes;
-import org.codehaus.jackson.annotate.JsonIgnore;
 
 /**
- * A top-level (row) tombstone.
+ * Information on deletion of a storage engine object.
  */
-public class DeletionTime implements Comparable<DeletionTime>, IMeasurableMemory
+public abstract class DeletionTime implements Comparable<DeletionTime>, IMeasurableMemory, Aliasable<DeletionTime>
 {
-    private static final long EMPTY_SIZE = ObjectSizes.measure(new DeletionTime(0, 0));
+    private static final long EMPTY_SIZE = ObjectSizes.measure(new SimpleDeletionTime(0, 0));
 
     /**
      * A special DeletionTime that signifies that there is no top-level (row) tombstone.
      */
-    public static final DeletionTime LIVE = new DeletionTime(Long.MIN_VALUE, Integer.MAX_VALUE);
+    public static final DeletionTime LIVE = new SimpleDeletionTime(Long.MIN_VALUE, Integer.MAX_VALUE);
+
+    public static final Serializer serializer = new Serializer();
 
     /**
      * A timestamp (typically in microseconds since the unix epoch, although this is not enforced) after which
      * data should be considered deleted. If set to Long.MIN_VALUE, this implies that the data has not been marked
      * for deletion at all.
      */
-    public final long markedForDeleteAt;
+    public abstract long markedForDeleteAt();
 
     /**
      * The local server timestamp, in seconds since the unix epoch, at which this tombstone was created. This is
      * only used for purposes of purging the tombstone after gc_grace_seconds have elapsed.
      */
-    public final int localDeletionTime;
-
-    public static final Serializer serializer = new Serializer();
-
-    @VisibleForTesting
-    public DeletionTime(long markedForDeleteAt, int localDeletionTime)
-    {
-        this.markedForDeleteAt = markedForDeleteAt;
-        this.localDeletionTime = localDeletionTime;
-    }
+    public abstract int localDeletionTime();
 
     /**
      * Returns whether this DeletionTime is live, that is deletes no columns.
      */
-    @JsonIgnore
     public boolean isLive()
     {
-        return markedForDeleteAt == Long.MIN_VALUE && localDeletionTime == Integer.MAX_VALUE;
+        return markedForDeleteAt() == Long.MIN_VALUE && localDeletionTime() == Integer.MAX_VALUE;
+    }
+
+    public void digest(MessageDigest digest)
+    {
+        FBUtilities.updateWithLong(digest, markedForDeleteAt());
+        FBUtilities.updateWithInt(digest, localDeletionTime());
     }
 
     @Override
@@ -79,48 +77,58 @@ public class DeletionTime implements Comparable<DeletionTime>, IMeasurableMemory
         if(!(o instanceof DeletionTime))
             return false;
         DeletionTime that = (DeletionTime)o;
-        return markedForDeleteAt == that.markedForDeleteAt && localDeletionTime == that.localDeletionTime;
+        return markedForDeleteAt() == that.markedForDeleteAt() && localDeletionTime() == that.localDeletionTime();
     }
 
     @Override
     public final int hashCode()
     {
-        return Objects.hashCode(markedForDeleteAt, localDeletionTime);
+        return Objects.hashCode(markedForDeleteAt(), localDeletionTime());
     }
 
     @Override
     public String toString()
     {
-        return String.format("deletedAt=%d, localDeletion=%d", markedForDeleteAt, localDeletionTime);
+        return String.format("deletedAt=%d, localDeletion=%d", markedForDeleteAt(), localDeletionTime());
     }
 
     public int compareTo(DeletionTime dt)
     {
-        if (markedForDeleteAt < dt.markedForDeleteAt)
+        if (markedForDeleteAt() < dt.markedForDeleteAt())
             return -1;
-        else if (markedForDeleteAt > dt.markedForDeleteAt)
+        else if (markedForDeleteAt() > dt.markedForDeleteAt())
             return 1;
-        else if (localDeletionTime < dt.localDeletionTime)
+        else if (localDeletionTime() < dt.localDeletionTime())
             return -1;
-        else if (localDeletionTime > dt.localDeletionTime)
+        else if (localDeletionTime() > dt.localDeletionTime())
             return -1;
         else
             return 0;
     }
 
-    public boolean isGcAble(int gcBefore)
+    public boolean supersedes(DeletionTime dt)
     {
-        return localDeletionTime < gcBefore;
+        return markedForDeleteAt() > dt.markedForDeleteAt() || (markedForDeleteAt() == dt.markedForDeleteAt() && localDeletionTime() > dt.localDeletionTime());
     }
 
-    public boolean isDeleted(OnDiskAtom atom)
+    public boolean isPurgeable(long maxPurgeableTimestamp, int gcBefore)
     {
-        return atom.timestamp() <= markedForDeleteAt;
+        return markedForDeleteAt() < maxPurgeableTimestamp && localDeletionTime() < gcBefore;
     }
 
-    public boolean supersedes(DeletionTime dt)
+    public boolean deletes(LivenessInfo info)
+    {
+        return deletes(info.timestamp());
+    }
+
+    public boolean deletes(long timestamp)
+    {
+        return timestamp <= markedForDeleteAt();
+    }
+
+    public int dataSize()
     {
-        return this.markedForDeleteAt > dt.markedForDeleteAt;
+        return 12;
     }
 
     public long unsharedHeapSize()
@@ -132,8 +140,8 @@ public class DeletionTime implements Comparable<DeletionTime>, IMeasurableMemory
     {
         public void serialize(DeletionTime delTime, DataOutputPlus out) throws IOException
         {
-            out.writeInt(delTime.localDeletionTime);
-            out.writeLong(delTime.markedForDeleteAt);
+            out.writeInt(delTime.localDeletionTime());
+            out.writeLong(delTime.markedForDeleteAt());
         }
 
         public DeletionTime deserialize(DataInput in) throws IOException
@@ -142,7 +150,7 @@ public class DeletionTime implements Comparable<DeletionTime>, IMeasurableMemory
             long mfda = in.readLong();
             return mfda == Long.MIN_VALUE && ldt == Integer.MAX_VALUE
                  ? LIVE
-                 : new DeletionTime(mfda, ldt);
+                 : new SimpleDeletionTime(mfda, ldt);
         }
 
         public void skip(DataInput in) throws IOException
@@ -152,8 +160,8 @@ public class DeletionTime implements Comparable<DeletionTime>, IMeasurableMemory
 
         public long serializedSize(DeletionTime delTime, TypeSizes typeSizes)
         {
-            return typeSizes.sizeof(delTime.localDeletionTime)
-                 + typeSizes.sizeof(delTime.markedForDeleteAt);
+            return typeSizes.sizeof(delTime.localDeletionTime())
+                 + typeSizes.sizeof(delTime.markedForDeleteAt());
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/DeletionTimeArray.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionTimeArray.java b/src/java/org/apache/cassandra/db/DeletionTimeArray.java
new file mode 100644
index 0000000..77eb953
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/DeletionTimeArray.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.util.Arrays;
+
+import org.apache.cassandra.utils.ObjectSizes;
+
+/**
+ * Utility class to store an array of deletion times a bit efficiently.
+ */
+public class DeletionTimeArray
+{
+    private long[] markedForDeleteAts;
+    private int[] delTimes;
+
+    public DeletionTimeArray(int initialCapacity)
+    {
+        this.markedForDeleteAts = new long[initialCapacity];
+        this.delTimes = new int[initialCapacity];
+        clear();
+    }
+
+    public void clear(int i)
+    {
+        markedForDeleteAts[i] = Long.MIN_VALUE;
+        delTimes[i] = Integer.MAX_VALUE;
+    }
+
+    public void set(int i, DeletionTime dt)
+    {
+        this.markedForDeleteAts[i] = dt.markedForDeleteAt();
+        this.delTimes[i] = dt.localDeletionTime();
+    }
+
+    public int size()
+    {
+        return markedForDeleteAts.length;
+    }
+
+    public void resize(int newSize)
+    {
+        int prevSize = size();
+
+        markedForDeleteAts = Arrays.copyOf(markedForDeleteAts, newSize);
+        delTimes = Arrays.copyOf(delTimes, newSize);
+
+        Arrays.fill(markedForDeleteAts, prevSize, newSize, Long.MIN_VALUE);
+        Arrays.fill(delTimes, prevSize, newSize, Integer.MAX_VALUE);
+    }
+
+    public boolean supersedes(int i, DeletionTime dt)
+    {
+        return markedForDeleteAts[i] > dt.markedForDeleteAt();
+    }
+
+    public boolean supersedes(int i, int j)
+    {
+        return markedForDeleteAts[i] > markedForDeleteAts[j];
+    }
+
+    public void swap(int i, int j)
+    {
+        long m = markedForDeleteAts[j];
+        int l = delTimes[j];
+
+        move(i, j);
+
+        markedForDeleteAts[i] = m;
+        delTimes[i] = l;
+    }
+
+    public void move(int i, int j)
+    {
+        markedForDeleteAts[j] = markedForDeleteAts[i];
+        delTimes[j] = delTimes[i];
+    }
+
+    public boolean isLive(int i)
+    {
+        return markedForDeleteAts[i] > Long.MIN_VALUE;
+    }
+
+    public void clear()
+    {
+        Arrays.fill(markedForDeleteAts, Long.MIN_VALUE);
+        Arrays.fill(delTimes, Integer.MAX_VALUE);
+    }
+
+    public int dataSize()
+    {
+        return 12 * markedForDeleteAts.length;
+    }
+
+    public long unsharedHeapSize()
+    {
+        return ObjectSizes.sizeOfArray(markedForDeleteAts)
+             + ObjectSizes.sizeOfArray(delTimes);
+    }
+
+    public void copy(DeletionTimeArray other)
+    {
+        assert size() == other.size();
+        for (int i = 0; i < size(); i++)
+        {
+            markedForDeleteAts[i] = other.markedForDeleteAts[i];
+            delTimes[i] = other.delTimes[i];
+        }
+    }
+
+    public static class Cursor extends DeletionTime
+    {
+        private DeletionTimeArray array;
+        private int i;
+
+        public Cursor setTo(DeletionTimeArray array, int i)
+        {
+            this.array = array;
+            this.i = i;
+            return this;
+        }
+
+        public long markedForDeleteAt()
+        {
+            return array.markedForDeleteAts[i];
+        }
+
+        public int localDeletionTime()
+        {
+            return array.delTimes[i];
+        }
+
+        public DeletionTime takeAlias()
+        {
+            return new SimpleDeletionTime(markedForDeleteAt(), localDeletionTime());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/ExpiringCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ExpiringCell.java b/src/java/org/apache/cassandra/db/ExpiringCell.java
deleted file mode 100644
index 5fc0f94..0000000
--- a/src/java/org/apache/cassandra/db/ExpiringCell.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.apache.cassandra.utils.memory.AbstractAllocator;
-import org.apache.cassandra.utils.memory.MemtableAllocator;
-
-/**
- * Alternative to Cell that have an expiring time.
- * ExpiringCell is immutable (as Cell is).
- *
- * Note that ExpiringCell does not override Cell.getMarkedForDeleteAt,
- * which means that it's in the somewhat unintuitive position of being deleted (after its expiration)
- * without having a time-at-which-it-became-deleted.  (Because ttl is a server-side measurement,
- * we can't mix it with the timestamp field, which is client-supplied and whose resolution we
- * can't assume anything about.)
- */
-public interface ExpiringCell extends Cell
-{
-    public static final int MAX_TTL = 20 * 365 * 24 * 60 * 60; // 20 years in seconds
-
-    public int getTimeToLive();
-
-    ExpiringCell localCopy(CFMetaData metadata, AbstractAllocator allocator);
-
-    ExpiringCell localCopy(CFMetaData metaData, MemtableAllocator allocator, OpOrder.Group opGroup);
-}