You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/06/30 12:48:01 UTC

[37/51] [partial] cassandra git commit: Storage engine refactor, a.k.a CASSANDRA-8099

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/RangeSliceCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeSliceCommand.java b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
deleted file mode 100644
index 664eeee..0000000
--- a/src/java/org/apache/cassandra/db/RangeSliceCommand.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import com.google.common.base.Objects;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.filter.ExtendedFilter;
-import org.apache.cassandra.db.filter.IDiskAtomFilter;
-import org.apache.cassandra.dht.AbstractBounds;
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.net.MessageOut;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.pager.Pageable;
-
-public class RangeSliceCommand extends AbstractRangeCommand implements Pageable
-{
-    public static final RangeSliceCommandSerializer serializer = new RangeSliceCommandSerializer();
-
-    public final int maxResults;
-    public final boolean countCQL3Rows;
-    public final boolean isPaging;
-
-    public RangeSliceCommand(String keyspace,
-                             String columnFamily,
-                             long timestamp,
-                             IDiskAtomFilter predicate,
-                             AbstractBounds<RowPosition> range,
-                             int maxResults)
-    {
-        this(keyspace, columnFamily, timestamp, predicate, range, null, maxResults, false, false);
-    }
-
-    public RangeSliceCommand(String keyspace,
-                             String columnFamily,
-                             long timestamp,
-                             IDiskAtomFilter predicate,
-                             AbstractBounds<RowPosition> range,
-                             List<IndexExpression> row_filter,
-                             int maxResults)
-    {
-        this(keyspace, columnFamily, timestamp, predicate, range, row_filter, maxResults, false, false);
-    }
-
-    public RangeSliceCommand(String keyspace,
-                             String columnFamily,
-                             long timestamp,
-                             IDiskAtomFilter predicate,
-                             AbstractBounds<RowPosition> range,
-                             List<IndexExpression> rowFilter,
-                             int maxResults,
-                             boolean countCQL3Rows,
-                             boolean isPaging)
-    {
-        super(keyspace, columnFamily, timestamp, range, predicate, rowFilter);
-        this.maxResults = maxResults;
-        this.countCQL3Rows = countCQL3Rows;
-        this.isPaging = isPaging;
-    }
-
-    public MessageOut<RangeSliceCommand> createMessage()
-    {
-        return new MessageOut<>(MessagingService.Verb.RANGE_SLICE, this, serializer);
-    }
-
-    public AbstractRangeCommand forSubRange(AbstractBounds<RowPosition> subRange)
-    {
-        return new RangeSliceCommand(keyspace,
-                                     columnFamily,
-                                     timestamp,
-                                     predicate.cloneShallow(),
-                                     subRange,
-                                     rowFilter,
-                                     maxResults,
-                                     countCQL3Rows,
-                                     isPaging);
-    }
-
-    public AbstractRangeCommand withUpdatedLimit(int newLimit)
-    {
-        return new RangeSliceCommand(keyspace,
-                                     columnFamily,
-                                     timestamp,
-                                     predicate.cloneShallow(),
-                                     keyRange,
-                                     rowFilter,
-                                     newLimit,
-                                     countCQL3Rows,
-                                     isPaging);
-    }
-
-    public int limit()
-    {
-        return maxResults;
-    }
-
-    public boolean countCQL3Rows()
-    {
-        return countCQL3Rows;
-    }
-
-    public List<Row> executeLocally()
-    {
-        ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(columnFamily);
-
-        ExtendedFilter exFilter = cfs.makeExtendedFilter(keyRange, predicate, rowFilter, maxResults, countCQL3Rows, isPaging, timestamp);
-        if (cfs.indexManager.hasIndexFor(rowFilter))
-            return cfs.search(exFilter);
-        else
-            return cfs.getRangeSlice(exFilter);
-    }
-
-    @Override
-    public String toString()
-    {
-        return Objects.toStringHelper(this)
-                      .add("keyspace", keyspace)
-                      .add("columnFamily", columnFamily)
-                      .add("predicate", predicate)
-                      .add("keyRange", keyRange)
-                      .add("rowFilter", rowFilter)
-                      .add("maxResults", maxResults)
-                      .add("counterCQL3Rows", countCQL3Rows)
-                      .add("timestamp", timestamp)
-                      .toString();
-    }
-}
-
-class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceCommand>
-{
-    public void serialize(RangeSliceCommand sliceCommand, DataOutputPlus out, int version) throws IOException
-    {
-        out.writeUTF(sliceCommand.keyspace);
-        out.writeUTF(sliceCommand.columnFamily);
-        out.writeLong(sliceCommand.timestamp);
-
-        CFMetaData metadata = Schema.instance.getCFMetaData(sliceCommand.keyspace, sliceCommand.columnFamily);
-
-        metadata.comparator.diskAtomFilterSerializer().serialize(sliceCommand.predicate, out, version);
-
-        if (sliceCommand.rowFilter == null)
-        {
-            out.writeInt(0);
-        }
-        else
-        {
-            out.writeInt(sliceCommand.rowFilter.size());
-            for (IndexExpression expr : sliceCommand.rowFilter)
-            {
-                expr.writeTo(out);
-            }
-        }
-        MessagingService.validatePartitioner(sliceCommand.keyRange);
-        AbstractBounds.rowPositionSerializer.serialize(sliceCommand.keyRange, out, version);
-        out.writeInt(sliceCommand.maxResults);
-        out.writeBoolean(sliceCommand.countCQL3Rows);
-        out.writeBoolean(sliceCommand.isPaging);
-    }
-
-    public RangeSliceCommand deserialize(DataInput in, int version) throws IOException
-    {
-        String keyspace = in.readUTF();
-        String columnFamily = in.readUTF();
-        long timestamp = in.readLong();
-
-        CFMetaData metadata = Schema.instance.getCFMetaData(keyspace, columnFamily);
-        if (metadata == null)
-        {
-            String message = String.format("Got range slice command for nonexistent table %s.%s.  If the table was just " +
-                    "created, this is likely due to the schema not being fully propagated.  Please wait for schema " +
-                    "agreement on table creation." , keyspace, columnFamily);
-            throw new UnknownColumnFamilyException(message, null);
-        }
-
-        IDiskAtomFilter predicate = metadata.comparator.diskAtomFilterSerializer().deserialize(in, version);
-
-        List<IndexExpression> rowFilter;
-        int filterCount = in.readInt();
-        rowFilter = new ArrayList<>(filterCount);
-        for (int i = 0; i < filterCount; i++)
-        {
-            rowFilter.add(IndexExpression.readFrom(in));
-        }
-        AbstractBounds<RowPosition> range = AbstractBounds.rowPositionSerializer.deserialize(in, MessagingService.globalPartitioner(), version);
-
-        int maxResults = in.readInt();
-        boolean countCQL3Rows = in.readBoolean();
-        boolean isPaging = in.readBoolean();
-        return new RangeSliceCommand(keyspace, columnFamily, timestamp, predicate, range, rowFilter, maxResults, countCQL3Rows, isPaging);
-    }
-
-    public long serializedSize(RangeSliceCommand rsc, int version)
-    {
-        long size = TypeSizes.NATIVE.sizeof(rsc.keyspace);
-        size += TypeSizes.NATIVE.sizeof(rsc.columnFamily);
-        size += TypeSizes.NATIVE.sizeof(rsc.timestamp);
-
-        CFMetaData metadata = Schema.instance.getCFMetaData(rsc.keyspace, rsc.columnFamily);
-
-        IDiskAtomFilter filter = rsc.predicate;
-
-        size += metadata.comparator.diskAtomFilterSerializer().serializedSize(filter, version);
-
-        if (rsc.rowFilter == null)
-        {
-            size += TypeSizes.NATIVE.sizeof(0);
-        }
-        else
-        {
-            size += TypeSizes.NATIVE.sizeof(rsc.rowFilter.size());
-            for (IndexExpression expr : rsc.rowFilter)
-            {
-                size += TypeSizes.NATIVE.sizeofWithShortLength(expr.column);
-                size += TypeSizes.NATIVE.sizeof(expr.operator.ordinal());
-                size += TypeSizes.NATIVE.sizeofWithShortLength(expr.value);
-            }
-        }
-        size += AbstractBounds.rowPositionSerializer.serializedSize(rsc.keyRange, version);
-        size += TypeSizes.NATIVE.sizeof(rsc.maxResults);
-        size += TypeSizes.NATIVE.sizeof(rsc.countCQL3Rows);
-        size += TypeSizes.NATIVE.sizeof(rsc.isPaging);
-        return size;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/RangeSliceReply.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeSliceReply.java b/src/java/org/apache/cassandra/db/RangeSliceReply.java
deleted file mode 100644
index ed1f523..0000000
--- a/src/java/org/apache/cassandra/db/RangeSliceReply.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.lang3.StringUtils;
-
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.io.util.FastByteArrayInputStream;
-import org.apache.cassandra.net.MessageOut;
-import org.apache.cassandra.net.MessagingService;
-
-public class RangeSliceReply
-{
-    public static final RangeSliceReplySerializer serializer = new RangeSliceReplySerializer();
-
-    public final List<Row> rows;
-
-    public RangeSliceReply(List<Row> rows)
-    {
-        this.rows = rows;
-    }
-
-    public MessageOut<RangeSliceReply> createMessage()
-    {
-        return new MessageOut<RangeSliceReply>(MessagingService.Verb.REQUEST_RESPONSE, this, serializer);
-    }
-
-    @Override
-    public String toString()
-    {
-        return "RangeSliceReply{" +
-               "rows=" + StringUtils.join(rows, ",") +
-               '}';
-    }
-
-    public static RangeSliceReply read(byte[] body, int version) throws IOException
-    {
-        try (DataInputStream dis = new DataInputStream(new FastByteArrayInputStream(body)))
-        {
-            return serializer.deserialize(dis, version);
-        }
-    }
-
-    private static class RangeSliceReplySerializer implements IVersionedSerializer<RangeSliceReply>
-    {
-        public void serialize(RangeSliceReply rsr, DataOutputPlus out, int version) throws IOException
-        {
-            out.writeInt(rsr.rows.size());
-            for (Row row : rsr.rows)
-                Row.serializer.serialize(row, out, version);
-        }
-
-        public RangeSliceReply deserialize(DataInput in, int version) throws IOException
-        {
-            int rowCount = in.readInt();
-            List<Row> rows = new ArrayList<Row>(rowCount);
-            for (int i = 0; i < rowCount; i++)
-                rows.add(Row.serializer.deserialize(in, version));
-            return new RangeSliceReply(rows);
-        }
-
-        public long serializedSize(RangeSliceReply rsr, int version)
-        {
-            int size = TypeSizes.NATIVE.sizeof(rsr.rows.size());
-            for (Row row : rsr.rows)
-                size += Row.serializer.serializedSize(row, version);
-            return size;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/RangeTombstone.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeTombstone.java b/src/java/org/apache/cassandra/db/RangeTombstone.java
index da483fc..3373afa 100644
--- a/src/java/org/apache/cassandra/db/RangeTombstone.java
+++ b/src/java/org/apache/cassandra/db/RangeTombstone.java
@@ -19,12 +19,12 @@ package org.apache.cassandra.db;
 
 import java.io.DataInput;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.security.MessageDigest;
 import java.util.*;
 
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.composites.CType;
-import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.ISSTableSerializer;
 import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.io.util.DataOutputBuffer;
@@ -32,350 +32,142 @@ import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.utils.Interval;
 
-public class RangeTombstone extends Interval<Composite, DeletionTime> implements OnDiskAtom
+/**
+ * A range tombstone is a tombstone that covers a slice/range of rows.
+ * <p>
+ * Note that in most of the storage engine, a range tombstone is actually represented by its separated
+ * opening and closing bound, see {@link RangeTombstoneMarker}. So in practice, this is only used when
+ * full partitions are materialized in memory in a {@code Partition} object, and more precisely through
+ * the use of a {@code RangeTombstoneList} in a {@code DeletionInfo} object.
+ */
+public class RangeTombstone
 {
-    public RangeTombstone(Composite start, Composite stop, long markedForDeleteAt, int localDeletionTime)
-    {
-        this(start, stop, new DeletionTime(markedForDeleteAt, localDeletionTime));
-    }
-
-    public RangeTombstone(Composite start, Composite stop, DeletionTime delTime)
-    {
-        super(start, stop, delTime);
-    }
-
-    public Composite name()
-    {
-        return min;
-    }
+    private final Slice slice;
+    private final DeletionTime deletion;
 
-    public int getLocalDeletionTime()
+    public RangeTombstone(Slice slice, DeletionTime deletion)
     {
-        return data.localDeletionTime;
+        this.slice = slice;
+        this.deletion = deletion.takeAlias();
     }
 
-    public long timestamp()
+    /**
+     * The slice of rows that is deleted by this range tombstone.
+     *
+     * @return the slice of rows that is deleted by this range tombstone.
+     */
+    public Slice deletedSlice()
     {
-        return data.markedForDeleteAt;
+        return slice;
     }
 
-    public void validateFields(CFMetaData metadata) throws MarshalException
+    /**
+     * The deletion time for this (range) tombstone.
+     *
+     * @return the deletion time for this range tombstone.
+     */
+    public DeletionTime deletionTime()
     {
-        metadata.comparator.validate(min);
-        metadata.comparator.validate(max);
+        return deletion;
     }
 
-    public void updateDigest(MessageDigest digest)
+    @Override
+    public boolean equals(Object other)
     {
-        digest.update(min.toByteBuffer().duplicate());
-        digest.update(max.toByteBuffer().duplicate());
-
-        try (DataOutputBuffer buffer = new DataOutputBuffer())
-        {
-            buffer.writeLong(data.markedForDeleteAt);
-            digest.update(buffer.getData(), 0, buffer.getLength());
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    /**
-     * This tombstone supersedes another one if it is more recent and cover a
-     * bigger range than rt.
-     */
-    public boolean supersedes(RangeTombstone rt, Comparator<Composite> comparator)
-    {
-        if (rt.data.markedForDeleteAt > data.markedForDeleteAt)
+        if(!(other instanceof RangeTombstone))
             return false;
 
-        return comparator.compare(min, rt.min) <= 0 && comparator.compare(max, rt.max) >= 0;
+        RangeTombstone that = (RangeTombstone)other;
+        return this.deletedSlice().equals(that.deletedSlice())
+            && this.deletionTime().equals(that.deletionTime());
     }
 
-    public boolean includes(Comparator<Composite> comparator, Composite name)
+    @Override
+    public int hashCode()
     {
-        return comparator.compare(name, min) >= 0 && comparator.compare(name, max) <= 0;
+        return Objects.hash(deletedSlice(), deletionTime());
     }
 
     /**
-     * Tracks opened RangeTombstones when iterating over a partition.
+     * The bound of a range tombstone.
      * <p>
-     * This tracker must be provided all the atoms of a given partition in
-     * order (to the {@code update} method). Given this, it keeps enough
-     * information to be able to decide if one of an atom is deleted (shadowed)
-     * by a previously open RT. One the tracker can prove a given range
-     * tombstone cannot be useful anymore (that is, as soon as we've seen an
-     * atom that is after the end of that RT), it discards this RT. In other
-     * words, the maximum memory used by this object should be proportional to
-     * the maximum number of RT that can be simultaneously open (and this
-     * should fairly low in practice).
+     * This is the same than for a slice but it includes "boundaries" between ranges. A boundary simply condensed
+     * a close and an opening "bound" into a single object. There is 2 main reasons for these "shortcut" boundaries:
+     *   1) When merging multiple iterators having range tombstones (that are represented by their start and end markers),
+     *      we need to know when a range is close on an iterator, if it is reopened right away. Otherwise, we cannot
+     *      easily produce the markers on the merged iterators within risking to fail the sorting guarantees of an
+     *      iterator. See this comment for more details: https://goo.gl/yyB5mR.
+     *   2) This saves some storage space.
      */
-    public static class Tracker
+    public static class Bound extends Slice.Bound
     {
-        private final Comparator<Composite> comparator;
-
-        // A list the currently open RTs. We keep the list sorted in order of growing end bounds as for a
-        // new atom, this allows to efficiently find the RTs that are now useless (if any). Also note that because
-        // atom are passed to the tracker in order, any RT that is tracked can be assumed as opened, i.e. we
-        // never have to test the RTs start since it's always assumed to be less than what we have.
-        // Also note that this will store expired RTs (#7810). Those will be of type ExpiredRangeTombstone and
-        // will be ignored by writeOpenedMarker.
-        private final List<RangeTombstone> openedTombstones = new LinkedList<RangeTombstone>();
-
-        // Total number of atoms written by writeOpenedMarker().
-        private int atomCount;
+        public static final Serializer serializer = new Serializer();
 
-        /**
-         * Creates a new tracker given the table comparator.
-         *
-         * @param comparator the comparator for the table this will track atoms
-         * for. The tracker assumes that atoms will be later provided to the
-         * tracker in {@code comparator} order.
-         */
-        public Tracker(Comparator<Composite> comparator)
+        public Bound(Kind kind, ByteBuffer[] values)
         {
-            this.comparator = comparator;
+            super(kind, values);
         }
 
-        /**
-         * Computes the RangeTombstone that are needed at the beginning of an index
-         * block starting with {@code firstColumn}.
-         *
-         * @return the total serialized size of said tombstones and write them to
-         * {@code out} it if isn't null.
-         */
-        public long writeOpenedMarker(OnDiskAtom firstColumn, DataOutputPlus out, OnDiskAtom.Serializer atomSerializer) throws IOException
+        public static RangeTombstone.Bound inclusiveOpen(boolean reversed, ByteBuffer[] boundValues)
         {
-            long size = 0;
-            if (openedTombstones.isEmpty())
-                return size;
-
-            /*
-             * Compute the markers that needs to be written at the beginning of
-             * this block. We need to write one if it is the more recent
-             * (opened) tombstone for at least some part of its range.
-             */
-            List<RangeTombstone> toWrite = new LinkedList<RangeTombstone>();
-            outer:
-            for (RangeTombstone tombstone : openedTombstones)
-            {
-                // If the first column is outside the range, skip it (in case update() hasn't been called yet)
-                if (comparator.compare(firstColumn.name(), tombstone.max) > 0)
-                    continue;
-
-                if (tombstone instanceof ExpiredRangeTombstone)
-                    continue;
-
-                RangeTombstone updated = new RangeTombstone(firstColumn.name(), tombstone.max, tombstone.data);
-
-                Iterator<RangeTombstone> iter = toWrite.iterator();
-                while (iter.hasNext())
-                {
-                    RangeTombstone other = iter.next();
-                    if (other.supersedes(updated, comparator))
-                        break outer;
-                    if (updated.supersedes(other, comparator))
-                        iter.remove();
-                }
-                toWrite.add(tombstone);
-            }
-
-            for (RangeTombstone tombstone : toWrite)
-            {
-                size += atomSerializer.serializedSizeForSSTable(tombstone);
-                atomCount++;
-                if (out != null)
-                    atomSerializer.serializeForSSTable(tombstone, out);
-            }
-            return size;
-        }
-
-        /**
-         * The total number of atoms written by calls to the method {@link #writeOpenedMarker}.
-         */
-        public int writtenAtom()
-        {
-            return atomCount;
-        }
-
-        /**
-         * Update this tracker given an {@code atom}.
-         * <p>
-         * This method first test if some range tombstone can be discarded due
-         * to the knowledge of that new atom. Then, if it's a range tombstone,
-         * it adds it to the tracker.
-         * <p>
-         * Note that this method should be called on *every* atom of a partition for
-         * the tracker to work as efficiently as possible (#9486).
-         */
-        public void update(OnDiskAtom atom, boolean isExpired)
-        {
-            // Get rid of now useless RTs
-            ListIterator<RangeTombstone> iterator = openedTombstones.listIterator();
-            while (iterator.hasNext())
-            {
-                // If this tombstone stops before the new atom, it is now useless since it cannot cover this or any future
-                // atoms. Otherwise, if a RT ends after the new atom, then we know that's true of any following atom too
-                // since maxOrderingSet is sorted by end bounds
-                RangeTombstone t = iterator.next();
-                if (comparator.compare(atom.name(), t.max) > 0)
-                {
-                    iterator.remove();
-                }
-                else
-                {
-                    // If the atom is a RT, we'll add it next and for that we want to start by looking at the atom we just
-                    // returned, so rewind the iterator.
-                    iterator.previous();
-                    break;
-                }
-            }
-
-            // If it's a RT, adds it.
-            if (atom instanceof RangeTombstone)
-            {
-                RangeTombstone toAdd = (RangeTombstone)atom;
-                if (isExpired)
-                    toAdd = new ExpiredRangeTombstone(toAdd);
-
-                // We want to maintain openedTombstones in end bounds order so we find where to insert the new element
-                // and add it. While doing so, we also check if that new tombstone fully shadow or is fully shadowed
-                // by an existing tombstone so we avoid tracking more tombstone than necessary (and we know this will
-                // at least happend for start-of-index-block repeated range tombstones).
-                while (iterator.hasNext())
-                {
-                    RangeTombstone existing = iterator.next();
-                    int cmp = comparator.compare(toAdd.max, existing.max);
-                    if (cmp > 0)
-                    {
-                        // the new one covers more than the existing one. If the new one happens to also supersedes
-                        // the existing one, remove the existing one. In any case, we're not done yet.
-                        if (toAdd.data.supersedes(existing.data))
-                            iterator.remove();
-                    }
-                    else
-                    {
-                        // the new one is included in the existing one. If the new one supersedes the existing one,
-                        // then we add the new one (and if the new one ends like the existing one, we can actually remove
-                        // the existing one), otherwise we can actually ignore it. In any case, we're done.
-                        if (toAdd.data.supersedes(existing.data))
-                        {
-                            if (cmp == 0)
-                                iterator.set(toAdd);
-                            else
-                                insertBefore(toAdd, iterator);
-                        }
-                        return;
-                    }
-                }
-                // If we reach here, either we had no tombstones and the new one ends after all existing ones.
-                iterator.add(toAdd);
-            }
+            return new Bound(reversed ? Kind.INCL_END_BOUND : Kind.INCL_START_BOUND, boundValues);
         }
 
-        /**
-         * Adds the provided {@code tombstone} _before_ the last element returned by {@code iterator.next()}.
-         * <p>
-         * This method assumes that {@code iterator.next()} has been called prior to this method call, i.e. that
-         * {@code iterator.hasPrevious() == true}.
-         */
-        private static void insertBefore(RangeTombstone tombstone, ListIterator<RangeTombstone> iterator)
+        public static RangeTombstone.Bound exclusiveOpen(boolean reversed, ByteBuffer[] boundValues)
         {
-            assert iterator.hasPrevious();
-            iterator.previous();
-            iterator.add(tombstone);
-            iterator.next();
+            return new Bound(reversed ? Kind.EXCL_END_BOUND : Kind.EXCL_START_BOUND, boundValues);
         }
 
-        /**
-         * Tests if the provided column is deleted by one of the tombstone
-         * tracked by this tracker.
-         * <p>
-         * This method should be called on columns in the same order than for the update()
-         * method. Note that this method does not update the tracker so the update() method
-         * should still be called on {@code column} (it doesn't matter if update is called
-         * before or after this call).
-         */
-        public boolean isDeleted(Cell cell)
+        public static RangeTombstone.Bound inclusiveClose(boolean reversed, ByteBuffer[] boundValues)
         {
-            // We know every tombstone kept are "open", start before the column. So the
-            // column is deleted if any of the tracked tombstone ends after the column
-            // (this will be the case of every RT if update() has been called before this
-            // method, but we might have a few RT to skip otherwise) and the RT deletion is
-            // actually more recent than the column timestamp.
-            for (RangeTombstone tombstone : openedTombstones)
-            {
-                if (comparator.compare(cell.name(), tombstone.max) <= 0
-                    && tombstone.timestamp() >= cell.timestamp())
-                    return true;
-            }
-            return false;
+            return new Bound(reversed ? Kind.INCL_START_BOUND : Kind.INCL_END_BOUND, boundValues);
         }
 
-        /**
-         * The tracker needs to track expired range tombstone but keep tracks that they are
-         * expired, so this is what this class is used for.
-         */
-        private static class ExpiredRangeTombstone extends RangeTombstone
+        public static RangeTombstone.Bound exclusiveClose(boolean reversed, ByteBuffer[] boundValues)
         {
-            private ExpiredRangeTombstone(RangeTombstone tombstone)
-            {
-                super(tombstone.min, tombstone.max, tombstone.data);
-            }
+            return new Bound(reversed ? Kind.EXCL_START_BOUND : Kind.EXCL_END_BOUND, boundValues);
         }
-    }
-
-    public static class Serializer implements ISSTableSerializer<RangeTombstone>
-    {
-        private final CType type;
 
-        public Serializer(CType type)
+        public static RangeTombstone.Bound inclusiveCloseExclusiveOpen(boolean reversed, ByteBuffer[] boundValues)
         {
-            this.type = type;
+            return new Bound(reversed ? Kind.EXCL_END_INCL_START_BOUNDARY : Kind.INCL_END_EXCL_START_BOUNDARY, boundValues);
         }
 
-        public void serializeForSSTable(RangeTombstone t, DataOutputPlus out) throws IOException
+        public static RangeTombstone.Bound exclusiveCloseInclusiveOpen(boolean reversed, ByteBuffer[] boundValues)
         {
-            type.serializer().serialize(t.min, out);
-            out.writeByte(ColumnSerializer.RANGE_TOMBSTONE_MASK);
-            type.serializer().serialize(t.max, out);
-            DeletionTime.serializer.serialize(t.data, out);
+            return new Bound(reversed ? Kind.INCL_END_EXCL_START_BOUNDARY : Kind.EXCL_END_INCL_START_BOUNDARY, boundValues);
         }
 
-        public RangeTombstone deserializeFromSSTable(DataInput in, Version version) throws IOException
+        @Override
+        public Bound withNewKind(Kind kind)
         {
-            Composite min = type.serializer().deserialize(in);
-
-            int b = in.readUnsignedByte();
-            assert (b & ColumnSerializer.RANGE_TOMBSTONE_MASK) != 0;
-            return deserializeBody(in, min, version);
+            return new Bound(kind, values);
         }
 
-        public RangeTombstone deserializeBody(DataInput in, Composite min, Version version) throws IOException
+        public static class Serializer
         {
-            Composite max = type.serializer().deserialize(in);
-            DeletionTime dt = DeletionTime.serializer.deserialize(in);
-            // If the max equals the min.end(), we can avoid keeping an extra ByteBuffer in memory by using
-            // min.end() instead of max
-            Composite minEnd = min.end();
-            max = minEnd.equals(max) ? minEnd : max;
-            return new RangeTombstone(min, max, dt);
-        }
+            public void serialize(RangeTombstone.Bound bound, DataOutputPlus out, int version, List<AbstractType<?>> types) throws IOException
+            {
+                out.writeByte(bound.kind().ordinal());
+                out.writeShort(bound.size());
+                ClusteringPrefix.serializer.serializeValuesWithoutSize(bound, out, version, types);
+            }
 
-        public void skipBody(DataInput in, Version version) throws IOException
-        {
-            type.serializer().skip(in);
-            DeletionTime.serializer.skip(in);
-        }
+            public long serializedSize(RangeTombstone.Bound bound, int version, List<AbstractType<?>> types, TypeSizes sizes)
+            {
+                return 1 // kind ordinal
+                     + sizes.sizeof((short)bound.size())
+                     + ClusteringPrefix.serializer.valuesWithoutSizeSerializedSize(bound, version, types, sizes);
+            }
 
-        public long serializedSizeForSSTable(RangeTombstone t)
-        {
-            TypeSizes typeSizes = TypeSizes.NATIVE;
-            return type.serializer().serializedSize(t.min, typeSizes)
-                 + 1 // serialization flag
-                 + type.serializer().serializedSize(t.max, typeSizes)
-                 + DeletionTime.serializer.serializedSize(t.data, typeSizes);
+            public Kind deserialize(DataInput in, int version, List<AbstractType<?>> types, Writer writer) throws IOException
+            {
+                Kind kind = Kind.values()[in.readByte()];
+                writer.writeBoundKind(kind);
+                int size = in.readUnsignedShort();
+                ClusteringPrefix.serializer.deserializeValuesWithoutSize(in, size, version, types, writer);
+                return kind;
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/RangeTombstoneList.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeTombstoneList.java b/src/java/org/apache/cassandra/db/RangeTombstoneList.java
index 37f1ef4..0c27bc4 100644
--- a/src/java/org/apache/cassandra/db/RangeTombstoneList.java
+++ b/src/java/org/apache/cassandra/db/RangeTombstoneList.java
@@ -20,9 +20,7 @@ package org.apache.cassandra.db;
 import java.io.DataInput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.security.MessageDigest;
 import java.util.Arrays;
-import java.util.Comparator;
 import java.util.Iterator;
 
 import com.google.common.collect.AbstractIterator;
@@ -31,12 +29,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.cache.IMeasurableMemory;
-import org.apache.cassandra.db.composites.CType;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.ObjectSizes;
 import org.apache.cassandra.utils.memory.AbstractAllocator;
 
@@ -62,19 +57,19 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
 
     private static long EMPTY_SIZE = ObjectSizes.measure(new RangeTombstoneList(null, 0));
 
-    private final Comparator<Composite> comparator;
+    private final ClusteringComparator comparator;
 
     // Note: we don't want to use a List for the markedAts and delTimes to avoid boxing. We could
     // use a List for starts and ends, but having arrays everywhere is almost simpler.
-    private Composite[] starts;
-    private Composite[] ends;
+    private Slice.Bound[] starts;
+    private Slice.Bound[] ends;
     private long[] markedAts;
     private int[] delTimes;
 
     private long boundaryHeapSize;
     private int size;
 
-    private RangeTombstoneList(Comparator<Composite> comparator, Composite[] starts, Composite[] ends, long[] markedAts, int[] delTimes, long boundaryHeapSize, int size)
+    private RangeTombstoneList(ClusteringComparator comparator, Slice.Bound[] starts, Slice.Bound[] ends, long[] markedAts, int[] delTimes, long boundaryHeapSize, int size)
     {
         assert starts.length == ends.length && starts.length == markedAts.length && starts.length == delTimes.length;
         this.comparator = comparator;
@@ -86,9 +81,9 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
         this.boundaryHeapSize = boundaryHeapSize;
     }
 
-    public RangeTombstoneList(Comparator<Composite> comparator, int capacity)
+    public RangeTombstoneList(ClusteringComparator comparator, int capacity)
     {
-        this(comparator, new Composite[capacity], new Composite[capacity], new long[capacity], new int[capacity], 0, 0);
+        this(comparator, new Slice.Bound[capacity], new Slice.Bound[capacity], new long[capacity], new int[capacity], 0, 0);
     }
 
     public boolean isEmpty()
@@ -101,7 +96,7 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
         return size;
     }
 
-    public Comparator<Composite> comparator()
+    public ClusteringComparator comparator()
     {
         return comparator;
     }
@@ -119,27 +114,36 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
     public RangeTombstoneList copy(AbstractAllocator allocator)
     {
         RangeTombstoneList copy =  new RangeTombstoneList(comparator,
-                                      new Composite[size],
-                                      new Composite[size],
-                                      Arrays.copyOf(markedAts, size),
-                                      Arrays.copyOf(delTimes, size),
-                                      boundaryHeapSize, size);
+                                                          new Slice.Bound[size],
+                                                          new Slice.Bound[size],
+                                                          Arrays.copyOf(markedAts, size),
+                                                          Arrays.copyOf(delTimes, size),
+                                                          boundaryHeapSize, size);
 
 
         for (int i = 0; i < size; i++)
         {
-            assert !(starts[i] instanceof AbstractNativeCell || ends[i] instanceof AbstractNativeCell); //this should never happen
-
-            copy.starts[i] = starts[i].copy(null, allocator);
-            copy.ends[i] = ends[i].copy(null, allocator);
+            copy.starts[i] = clone(starts[i], allocator);
+            copy.ends[i] = clone(ends[i], allocator);
         }
 
         return copy;
     }
 
+    private static Slice.Bound clone(Slice.Bound bound, AbstractAllocator allocator)
+    {
+        ByteBuffer[] values = new ByteBuffer[bound.size()];
+        for (int i = 0; i < values.length; i++)
+            values[i] = allocator.clone(bound.get(i));
+        return new Slice.Bound(bound.kind(), values);
+    }
+
     public void add(RangeTombstone tombstone)
     {
-        add(tombstone.min, tombstone.max, tombstone.data.markedForDeleteAt, tombstone.data.localDeletionTime);
+        add(tombstone.deletedSlice().start(),
+            tombstone.deletedSlice().end(),
+            tombstone.deletionTime().markedForDeleteAt(),
+            tombstone.deletionTime().localDeletionTime());
     }
 
     /**
@@ -148,7 +152,7 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
      * This method will be faster if the new tombstone sort after all the currently existing ones (this is a common use case),
      * but it doesn't assume it.
      */
-    public void add(Composite start, Composite end, long markedAt, int delTime)
+    public void add(Slice.Bound start, Slice.Bound end, long markedAt, int delTime)
     {
         if (isEmpty())
         {
@@ -215,7 +219,7 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
             int j = 0;
             while (i < size && j < tombstones.size)
             {
-                if (comparator.compare(tombstones.starts[j], ends[i]) <= 0)
+                if (comparator.compare(tombstones.starts[j], ends[i]) < 0)
                 {
                     insertFrom(i, tombstones.starts[j], tombstones.ends[j], tombstones.markedAts[j], tombstones.delTimes[j]);
                     j++;
@@ -235,34 +239,26 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
      * Returns whether the given name/timestamp pair is deleted by one of the tombstone
      * of this RangeTombstoneList.
      */
-    public boolean isDeleted(Cell cell)
+    public boolean isDeleted(Clustering clustering, Cell cell)
     {
-        int idx = searchInternal(cell.name(), 0);
+        int idx = searchInternal(clustering, 0, size);
         // No matter what the counter cell's timestamp is, a tombstone always takes precedence. See CASSANDRA-7346.
-        return idx >= 0 && (cell instanceof CounterCell || markedAts[idx] >= cell.timestamp());
-    }
-
-    /**
-     * Returns a new {@link InOrderTester}.
-     */
-    InOrderTester inOrderTester()
-    {
-        return new InOrderTester();
+        return idx >= 0 && (cell.isCounterCell() || markedAts[idx] >= cell.livenessInfo().timestamp());
     }
 
     /**
      * Returns the DeletionTime for the tombstone overlapping {@code name} (there can't be more than one),
      * or null if {@code name} is not covered by any tombstone.
      */
-    public DeletionTime searchDeletionTime(Composite name)
+    public DeletionTime searchDeletionTime(Clustering name)
     {
-        int idx = searchInternal(name, 0);
-        return idx < 0 ? null : new DeletionTime(markedAts[idx], delTimes[idx]);
+        int idx = searchInternal(name, 0, size);
+        return idx < 0 ? null : new SimpleDeletionTime(markedAts[idx], delTimes[idx]);
     }
 
-    public RangeTombstone search(Composite name)
+    public RangeTombstone search(Clustering name)
     {
-        int idx = searchInternal(name, 0);
+        int idx = searchInternal(name, 0, size);
         return idx < 0 ? null : rangeTombstone(idx);
     }
 
@@ -270,20 +266,15 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
      * Return is the index of the range covering name if name is covered. If the return idx is negative,
      * no range cover name and -idx-1 is the index of the first range whose start is greater than name.
      */
-    private int searchInternal(Composite name, int startIdx)
+    private int searchInternal(ClusteringPrefix name, int startIdx, int endIdx)
     {
         if (isEmpty())
             return -1;
 
-        int pos = Arrays.binarySearch(starts, startIdx, size, name, comparator);
+        int pos = Arrays.binarySearch(starts, startIdx, endIdx, name, comparator);
         if (pos >= 0)
         {
-            // We're exactly on an interval start. The one subtility is that we need to check if
-            // the previous is not equal to us and doesn't have a higher marked at
-            if (pos > 0 && comparator.compare(name, ends[pos-1]) == 0 && markedAts[pos-1] > markedAts[pos])
-                return pos-1;
-            else
-                return pos;
+            return pos;
         }
         else
         {
@@ -308,93 +299,94 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
         return dataSize;
     }
 
-    public long minMarkedAt()
+    public void updateAllTimestamp(long timestamp)
     {
-        long min = Long.MAX_VALUE;
         for (int i = 0; i < size; i++)
-            min = Math.min(min, markedAts[i]);
-        return min;
+            markedAts[i] = timestamp;
     }
 
-    public long maxMarkedAt()
+    private RangeTombstone rangeTombstone(int idx)
     {
-        long max = Long.MIN_VALUE;
-        for (int i = 0; i < size; i++)
-            max = Math.max(max, markedAts[i]);
-        return max;
+        return new RangeTombstone(Slice.make(starts[idx], ends[idx]), new SimpleDeletionTime(markedAts[idx], delTimes[idx]));
     }
 
-    public void updateAllTimestamp(long timestamp)
+    private RangeTombstone rangeTombstoneWithNewStart(int idx, Slice.Bound newStart)
     {
-        for (int i = 0; i < size; i++)
-            markedAts[i] = timestamp;
+        return new RangeTombstone(Slice.make(newStart, ends[idx]), new SimpleDeletionTime(markedAts[idx], delTimes[idx]));
     }
 
-    /**
-     * Removes all range tombstones whose local deletion time is older than gcBefore.
-     */
-    public void purge(int gcBefore)
+    private RangeTombstone rangeTombstoneWithNewEnd(int idx, Slice.Bound newEnd)
     {
-        int j = 0;
-        for (int i = 0; i < size; i++)
-        {
-            if (delTimes[i] >= gcBefore)
-                setInternal(j++, starts[i], ends[i], markedAts[i], delTimes[i]);
-        }
-        size = j;
+        return new RangeTombstone(Slice.make(starts[idx], newEnd), new SimpleDeletionTime(markedAts[idx], delTimes[idx]));
     }
 
-    /**
-     * Returns whether {@code purge(gcBefore)} would remove something or not.
-     */
-    public boolean hasPurgeableTombstones(int gcBefore)
+    private RangeTombstone rangeTombstoneWithNewBounds(int idx, Slice.Bound newStart, Slice.Bound newEnd)
     {
-        for (int i = 0; i < size; i++)
-        {
-            if (delTimes[i] < gcBefore)
-                return true;
-        }
-        return false;
+        return new RangeTombstone(Slice.make(newStart, newEnd), new SimpleDeletionTime(markedAts[idx], delTimes[idx]));
     }
 
-    private RangeTombstone rangeTombstone(int idx)
+    public Iterator<RangeTombstone> iterator()
     {
-        return new RangeTombstone(starts[idx], ends[idx], markedAts[idx], delTimes[idx]);
+        return iterator(false);
     }
 
-    public Iterator<RangeTombstone> iterator()
+    public Iterator<RangeTombstone> iterator(boolean reversed)
     {
-        return new AbstractIterator<RangeTombstone>()
-        {
-            private int idx;
+        return reversed
+             ? new AbstractIterator<RangeTombstone>()
+             {
+                 private int idx = size - 1;
 
-            protected RangeTombstone computeNext()
-            {
-                if (idx >= size)
-                    return endOfData();
+                 protected RangeTombstone computeNext()
+                 {
+                     if (idx < 0)
+                         return endOfData();
 
-                return rangeTombstone(idx++);
-            }
-        };
+                     return rangeTombstone(idx--);
+                 }
+             }
+             : new AbstractIterator<RangeTombstone>()
+             {
+                 private int idx;
+
+                 protected RangeTombstone computeNext()
+                 {
+                     if (idx >= size)
+                         return endOfData();
+
+                     return rangeTombstone(idx++);
+                 }
+             };
     }
 
-    public Iterator<RangeTombstone> iterator(Composite from, Composite till)
+    public Iterator<RangeTombstone> iterator(final Slice slice, boolean reversed)
     {
-        int startIdx = from.isEmpty() ? 0 : searchInternal(from, 0);
+        return reversed ? reverseIterator(slice) : forwardIterator(slice);
+    }
+
+    private Iterator<RangeTombstone> forwardIterator(final Slice slice)
+    {
+        int startIdx = slice.start() == Slice.Bound.BOTTOM ? 0 : searchInternal(slice.start(), 0, size);
         final int start = startIdx < 0 ? -startIdx-1 : startIdx;
 
         if (start >= size)
             return Iterators.<RangeTombstone>emptyIterator();
 
-        int finishIdx = till.isEmpty() ? size : searchInternal(till, start);
-        // if stopIdx is the first range after 'till' we care only until the previous range
+        int finishIdx = slice.end() == Slice.Bound.TOP ? size - 1 : searchInternal(slice.end(), start, size);
+        // if stopIdx is the first range after 'slice.end()' we care only until the previous range
         final int finish = finishIdx < 0 ? -finishIdx-2 : finishIdx;
 
-        // Note: the following is true because we know 'from' is before 'till' in sorted order.
         if (start > finish)
             return Iterators.<RangeTombstone>emptyIterator();
-        else if (start == finish)
-            return Iterators.<RangeTombstone>singletonIterator(rangeTombstone(start));
+
+        if (start == finish)
+        {
+            // We want to make sure the range are stricly included within the queried slice as this
+            // make it easier to combine things when iterating over successive slices.
+            Slice.Bound s = comparator.compare(starts[start], slice.start()) < 0 ? slice.start() : starts[start];
+            Slice.Bound e = comparator.compare(slice.end(), ends[start]) < 0 ? slice.end() : ends[start];
+            return Iterators.<RangeTombstone>singletonIterator(rangeTombstoneWithNewBounds(start, s, e));
+        }
 
         return new AbstractIterator<RangeTombstone>()
         {
@@ -405,76 +397,63 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
                 if (idx >= size || idx > finish)
                     return endOfData();
 
+                // We want to make sure the range are stricly included within the queried slice as this
+                // make it easier to combine things when iterating over successive slices. This means that
+                // for the first and last range we might have to "cut" the range returned.
+                if (idx == start && comparator.compare(starts[idx], slice.start()) < 0)
+                    return rangeTombstoneWithNewStart(idx++, slice.start());
+                if (idx == finish && comparator.compare(slice.end(), ends[idx]) < 0)
+                    return rangeTombstoneWithNewEnd(idx++, slice.end());
                 return rangeTombstone(idx++);
             }
         };
     }
 
-    /**
-     * Evaluates a diff between superset (known to be all merged tombstones) and this list for read repair
-     *
-     * @return null if there is no difference
-     */
-    public RangeTombstoneList diff(RangeTombstoneList superset)
+    private Iterator<RangeTombstone> reverseIterator(final Slice slice)
     {
-        if (isEmpty())
-            return superset;
+        int startIdx = slice.end() == Slice.Bound.TOP ? 0 : searchInternal(slice.end(), 0, size);
+        // if startIdx is the first range after 'slice.end()' we care only until the previous range
+        final int start = startIdx < 0 ? -startIdx-2 : startIdx;
 
-        RangeTombstoneList diff = null;
-
-        int j = 0; // index to iterate through our own list
-        for (int i = 0; i < superset.size; i++)
-        {
-            // we can assume that this list is a subset of the superset list
-            while (j < size && comparator.compare(starts[j], superset.starts[i]) < 0)
-                j++;
+        if (start >= size)
+            return Iterators.<RangeTombstone>emptyIterator();
 
-            if (j >= size)
-            {
-                // we're at the end of our own list, add the remainder of the superset to the diff
-                if (i < superset.size)
-                {
-                    if (diff == null)
-                        diff = new RangeTombstoneList(comparator, superset.size - i);
+        int finishIdx = slice.start() == Slice.Bound.BOTTOM ? 0 : searchInternal(slice.start(), 0, start);
+        // if stopIdx is the first range after 'slice.end()' we care only until the previous range
+        final int finish = finishIdx < 0 ? -finishIdx-1 : finishIdx;
 
-                    for(int k = i; k < superset.size; k++)
-                        diff.add(superset.starts[k], superset.ends[k], superset.markedAts[k], superset.delTimes[k]);
-                }
-                return diff;
-            }
+        if (start < finish)
+            return Iterators.<RangeTombstone>emptyIterator();
 
-            // we don't care about local deletion time here, because it doesn't matter for read repair
-            if (!starts[j].equals(superset.starts[i])
-                || !ends[j].equals(superset.ends[i])
-                || markedAts[j] != superset.markedAts[i])
-            {
-                if (diff == null)
-                    diff = new RangeTombstoneList(comparator, Math.min(8, superset.size - i));
-                diff.add(superset.starts[i], superset.ends[i], superset.markedAts[i], superset.delTimes[i]);
-            }
+        if (start == finish)
+        {
+            // We want to make sure the range are stricly included within the queried slice as this
+            // make it easier to combine things when iterator over successive slices.
+            Slice.Bound s = comparator.compare(starts[start], slice.start()) < 0 ? slice.start() : starts[start];
+            Slice.Bound e = comparator.compare(slice.end(), ends[start]) < 0 ? slice.end() : ends[start];
+            return Iterators.<RangeTombstone>singletonIterator(rangeTombstoneWithNewBounds(start, s, e));
         }
 
-        return diff;
-    }
-    
-    /**
-     * Calculates digest for triggering read repair on mismatch
-     */
-    public void updateDigest(MessageDigest digest)
-    {
-        ByteBuffer longBuffer = ByteBuffer.allocate(8);
-        for (int i = 0; i < size; i++)
+        return new AbstractIterator<RangeTombstone>()
         {
-            for (int j = 0; j < starts[i].size(); j++)
-                digest.update(starts[i].get(j).duplicate());
-            for (int j = 0; j < ends[i].size(); j++)
-                digest.update(ends[i].get(j).duplicate());
+            private int idx = start;
 
-            longBuffer.putLong(0, markedAts[i]);
-            digest.update(longBuffer.array(), 0, 8);
-        }
-    }
+            protected RangeTombstone computeNext()
+            {
+                if (idx < 0 || idx < finish)
+                    return endOfData();
 
+                // We want to make sure the range are stricly included within the queried slice as this
+                // make it easier to combine things when iterator over successive slices. This means that
+                // for the first and last range we might have to "cut" the range returned.
+                if (idx == start && comparator.compare(slice.end(), ends[idx]) < 0)
+                    return rangeTombstoneWithNewEnd(idx--, slice.end());
+                if (idx == finish && comparator.compare(starts[idx], slice.start()) < 0)
+                    return rangeTombstoneWithNewStart(idx--, slice.start());
+                return rangeTombstone(idx++);
+            }
+        };
+    }
 
     @Override
     public boolean equals(Object o)
@@ -525,51 +504,24 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
 
     /*
      * Inserts a new element starting at index i. This method assumes that:
-     *    ends[i-1] <= start <= ends[i]
+     *    ends[i-1] < start < ends[i]
+     * (note that we cannot have start == end since both will at least have a different bound "kind")
      *
      * A RangeTombstoneList is a list of range [s_0, e_0]...[s_n, e_n] such that:
      *   - s_i <= e_i
-     *   - e_i <= s_i+1
-     *   - if s_i == e_i and e_i == s_i+1 then s_i+1 < e_i+1
-     * Basically, range are non overlapping except for their bound and in order. And while
-     * we allow ranges with the same value for the start and end, we don't allow repeating
-     * such range (so we can't have [0, 0][0, 0] even though it would respect the first 2
-     * conditions).
-     *
+     *   - e_i < s_i+1
+     * Basically, range are non overlapping and in order.
      */
-    private void insertFrom(int i, Composite start, Composite end, long markedAt, int delTime)
+    private void insertFrom(int i, Slice.Bound start, Slice.Bound end, long markedAt, int delTime)
     {
         while (i < size)
         {
-            assert i == 0 || comparator.compare(ends[i-1], start) <= 0;
+            assert start.isStart() && end.isEnd();
+            assert i == 0 || comparator.compare(ends[i-1], start) < 0;
+            assert comparator.compare(start, ends[i]) < 0;
 
-            int c = comparator.compare(start, ends[i]);
-            assert c <= 0;
-            if (c == 0)
-            {
-                // If start == ends[i], then we can insert from the next one (basically the new element
-                // really start at the next element), except for the case where starts[i] == ends[i].
-                // In this latter case, if we were to move to next element, we could end up with ...[x, x][x, x]...
-                if (comparator.compare(starts[i], ends[i]) == 0)
-                {
-                    // The current element cover a single value which is equal to the start of the inserted
-                    // element. If the inserted element overwrites the current one, just remove the current
-                    // (it's included in what we insert) and proceed with the insert.
-                    if (markedAt > markedAts[i])
-                    {
-                        removeInternal(i);
-                        continue;
-                    }
-
-                    // Otherwise (the current singleton interval override the new one), we want to leave the
-                    // current element and move to the next, unless start == end since that means the new element
-                    // is in fact fully covered by the current one (so we're done)
-                    if (comparator.compare(start, end) == 0)
-                        return;
-                }
-                i++;
-                continue;
-            }
+            if (Slice.isEmpty(comparator, start, end))
+                return;
 
             // Do we overwrite the current element?
             if (markedAt > markedAts[i])
@@ -579,26 +531,24 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
                 // First deal with what might come before the newly added one.
                 if (comparator.compare(starts[i], start) < 0)
                 {
-                    addInternal(i, starts[i], start, markedAts[i], delTimes[i]);
-                    i++;
-                    // We don't need to do the following line, but in spirit that's what we want to do
-                    // setInternal(i, start, ends[i], markedAts, delTime])
+                    Slice.Bound newEnd = start.invert();
+                    if (!Slice.isEmpty(comparator, starts[i], newEnd))
+                    {
+                        addInternal(i, starts[i], start.invert(), markedAts[i], delTimes[i]);
+                        i++;
+                        setInternal(i, start, ends[i], markedAts[i], delTimes[i]);
+                    }
                 }
 
                 // now, start <= starts[i]
 
-                // Does the new element stops before/at the current one,
+                // Does the new element stops before the current one,
                 int endCmp = comparator.compare(end, starts[i]);
-                if (endCmp <= 0)
+                if (endCmp < 0)
                 {
-                    // Here start <= starts[i] and end <= starts[i]
-                    // This means the current element is before the current one. However, one special
-                    // case is if end == starts[i] and starts[i] == ends[i]. In that case,
-                    // the new element entirely overwrite the current one and we can just overwrite
-                    if (endCmp == 0 && comparator.compare(starts[i], ends[i]) == 0)
-                        setInternal(i, start, end, markedAt, delTime);
-                    else
-                        addInternal(i, start, end, markedAt, delTime);
+                    // Here start <= starts[i] and end < starts[i]
+                    // This means the current element is before the current one.
+                    addInternal(i, start, end, markedAt, delTime);
                     return;
                 }
 
@@ -617,20 +567,29 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
                         return;
                     }
 
-                    setInternal(i, start, ends[i], markedAt, delTime);
-                    if (cmp == 0)
+                    // Otherwise, the new element overwite until the min(end, next start)
+                    if (comparator.compare(end, starts[i+1]) < 0)
+                    {
+                        setInternal(i, start, end, markedAt, delTime);
+                        // We have fully handled the new element so we're done
                         return;
+                    }
 
-                    start = ends[i];
+                    setInternal(i, start, starts[i+1].invert(), markedAt, delTime);
+                    start = starts[i+1];
                     i++;
                 }
                 else
                 {
-                    // We don't ovewrite fully. Insert the new interval, and then update the now next
+                    // We don't overwrite fully. Insert the new interval, and then update the now next
                     // one to reflect the not overwritten parts. We're then done.
                     addInternal(i, start, end, markedAt, delTime);
                     i++;
-                    setInternal(i, end, ends[i], markedAts[i], delTimes[i]);
+                    Slice.Bound newStart = end.invert();
+                    if (!Slice.isEmpty(comparator, newStart, ends[i]))
+                    {
+                        setInternal(i, newStart, ends[i], markedAts[i], delTimes[i]);
+                    }
                     return;
                 }
             }
@@ -644,13 +603,17 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
                     // If we stop before the start of the current element, just insert the new
                     // interval and we're done; otherwise insert until the beginning of the
                     // current element
-                    if (comparator.compare(end, starts[i]) <= 0)
+                    if (comparator.compare(end, starts[i]) < 0)
                     {
                         addInternal(i, start, end, markedAt, delTime);
                         return;
                     }
-                    addInternal(i, start, starts[i], markedAt, delTime);
-                    i++;
+                    Slice.Bound newEnd = starts[i].invert();
+                    if (!Slice.isEmpty(comparator, start, newEnd))
+                    {
+                        addInternal(i, start, newEnd, markedAt, delTime);
+                        i++;
+                    }
                 }
 
                 // After that, we're overwritten on the current element but might have
@@ -660,7 +623,7 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
                 if (comparator.compare(end, ends[i]) <= 0)
                     return;
 
-                start = ends[i];
+                start = ends[i].invert();
                 i++;
             }
         }
@@ -677,7 +640,7 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
     /*
      * Adds the new tombstone at index i, growing and/or moving elements to make room for it.
      */
-    private void addInternal(int i, Composite start, Composite end, long markedAt, int delTime)
+    private void addInternal(int i, Slice.Bound start, Slice.Bound end, long markedAt, int delTime)
     {
         assert i >= 0;
 
@@ -730,12 +693,12 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
         delTimes = grow(delTimes, size, newLength, i);
     }
 
-    private static Composite[] grow(Composite[] a, int size, int newLength, int i)
+    private static Slice.Bound[] grow(Slice.Bound[] a, int size, int newLength, int i)
     {
         if (i < 0 || i >= size)
             return Arrays.copyOf(a, newLength);
 
-        Composite[] newA = new Composite[newLength];
+        Slice.Bound[] newA = new Slice.Bound[newLength];
         System.arraycopy(a, 0, newA, 0, i);
         System.arraycopy(a, i, newA, i+1, size - i);
         return newA;
@@ -780,7 +743,7 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
         starts[i] = null;
     }
 
-    private void setInternal(int i, Composite start, Composite end, long markedAt, int delTime)
+    private void setInternal(int i, Slice.Bound start, Slice.Bound end, long markedAt, int delTime)
     {
         if (starts[i] != null)
             boundaryHeapSize -= starts[i].unsharedHeapSize() + ends[i].unsharedHeapSize();
@@ -802,82 +765,91 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
                 + ObjectSizes.sizeOfArray(delTimes);
     }
 
+    // TODO: This should be moved someplace else as it shouldn't be used directly: some ranges might become
+    // complex deletion times. We'll also only need this for backward compatibility, this isn't used otherwise.
     public static class Serializer implements IVersionedSerializer<RangeTombstoneList>
     {
-        private final CType type;
+        private final LegacyLayout layout;
 
-        public Serializer(CType type)
+        public Serializer(LegacyLayout layout)
         {
-            this.type = type;
+            this.layout = layout;
         }
 
         public void serialize(RangeTombstoneList tombstones, DataOutputPlus out, int version) throws IOException
         {
-            if (tombstones == null)
-            {
-                out.writeInt(0);
-                return;
-            }
-
-            out.writeInt(tombstones.size);
-            for (int i = 0; i < tombstones.size; i++)
-            {
-                type.serializer().serialize(tombstones.starts[i], out);
-                type.serializer().serialize(tombstones.ends[i], out);
-                out.writeInt(tombstones.delTimes[i]);
-                out.writeLong(tombstones.markedAts[i]);
-            }
+            // TODO
+            throw new UnsupportedOperationException();
+            //if (tombstones == null)
+            //{
+            //    out.writeInt(0);
+            //    return;
+            //}
+
+            //out.writeInt(tombstones.size);
+            //for (int i = 0; i < tombstones.size; i++)
+            //{
+            //    layout.serializer().serialize(tombstones.starts[i], out);
+            //    layout.serializer().serialize(tombstones.ends[i], out);
+            //    out.writeInt(tombstones.delTimes[i]);
+            //    out.writeLong(tombstones.markedAts[i]);
+            //}
         }
 
         public RangeTombstoneList deserialize(DataInput in, int version) throws IOException
         {
-            int size = in.readInt();
-            if (size == 0)
-                return null;
-
-            RangeTombstoneList tombstones = new RangeTombstoneList(type, size);
-
-            for (int i = 0; i < size; i++)
-            {
-                Composite start = type.serializer().deserialize(in);
-                Composite end = type.serializer().deserialize(in);
-                int delTime =  in.readInt();
-                long markedAt = in.readLong();
-
-                if (version >= MessagingService.VERSION_20)
-                {
-                    tombstones.setInternal(i, start, end, markedAt, delTime);
-                }
-                else
-                {
-                    /*
-                     * The old implementation used to have range sorted by left value, but with potentially
-                     * overlapping range. So we need to use the "slow" path.
-                     */
-                    tombstones.add(start, end, markedAt, delTime);
-                }
-            }
-
-            // The "slow" path take care of updating the size, but not the fast one
-            if (version >= MessagingService.VERSION_20)
-                tombstones.size = size;
-            return tombstones;
+            // TODO
+            throw new UnsupportedOperationException();
+
+            //int size = in.readInt();
+            //if (size == 0)
+            //    return null;
+
+            //RangeTombstoneList tombstones = new RangeTombstoneList(layout, size);
+
+            //for (int i = 0; i < size; i++)
+            //{
+            //    Slice.Bound start = layout.serializer().deserialize(in);
+            //    Slice.Bound end = layout.serializer().deserialize(in);
+            //    int delTime =  in.readInt();
+            //    long markedAt = in.readLong();
+
+            //    if (version >= MessagingService.VERSION_20)
+            //    {
+            //        tombstones.setInternal(i, start, end, markedAt, delTime);
+            //    }
+            //    else
+            //    {
+            //        /*
+            //         * The old implementation used to have range sorted by left value, but with potentially
+            //         * overlapping range. So we need to use the "slow" path.
+            //         */
+            //        tombstones.add(start, end, markedAt, delTime);
+            //    }
+            //}
+
+            //// The "slow" path take care of updating the size, but not the fast one
+            //if (version >= MessagingService.VERSION_20)
+            //    tombstones.size = size;
+            //return tombstones;
         }
 
         public long serializedSize(RangeTombstoneList tombstones, TypeSizes typeSizes, int version)
         {
-            if (tombstones == null)
-                return typeSizes.sizeof(0);
-
-            long size = typeSizes.sizeof(tombstones.size);
-            for (int i = 0; i < tombstones.size; i++)
-            {
-                size += type.serializer().serializedSize(tombstones.starts[i], typeSizes);
-                size += type.serializer().serializedSize(tombstones.ends[i], typeSizes);
-                size += typeSizes.sizeof(tombstones.delTimes[i]);
-                size += typeSizes.sizeof(tombstones.markedAts[i]);
-            }
-            return size;
+            // TODO
+            throw new UnsupportedOperationException();
+            //if (tombstones == null)
+            //    return typeSizes.sizeof(0);
+
+            //long size = typeSizes.sizeof(tombstones.size);
+            //for (int i = 0; i < tombstones.size; i++)
+            //{
+            //    size += type.serializer().serializedSize(tombstones.starts[i], typeSizes);
+            //    size += type.serializer().serializedSize(tombstones.ends[i], typeSizes);
+            //    size += typeSizes.sizeof(tombstones.delTimes[i]);
+            //    size += typeSizes.sizeof(tombstones.markedAts[i]);
+            //}
+            //return size;
         }
 
         public long serializedSize(RangeTombstoneList tombstones, int version)
@@ -885,56 +857,4 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable
             return serializedSize(tombstones, TypeSizes.NATIVE, version);
         }
     }
-
-    /**
-     * This object allow testing whether a given column (name/timestamp) is deleted
-     * or not by this RangeTombstoneList, assuming that the column given to this
-     * object are passed in (comparator) sorted order.
-     *
-     * This is more efficient that calling RangeTombstoneList.isDeleted() repeatedly
-     * in that case since we're able to take the sorted nature of the RangeTombstoneList
-     * into account.
-     */
-    public class InOrderTester
-    {
-        private int idx;
-
-        public boolean isDeleted(Cell cell)
-        {
-            CellName name = cell.name();
-            long timestamp = cell.timestamp();
-
-            while (idx < size)
-            {
-                int cmp = comparator.compare(name, starts[idx]);
-
-                if (cmp < 0)
-                {
-                    return false;
-                }
-                else if (cmp == 0)
-                {
-                    // No matter what the counter cell's timestamp is, a tombstone always takes precedence. See CASSANDRA-7346.
-                    if (cell instanceof CounterCell)
-                        return true;
-
-                    // As for searchInternal, we need to check the previous end
-                    if (idx > 0 && comparator.compare(name, ends[idx-1]) == 0 && markedAts[idx-1] > markedAts[idx])
-                        return markedAts[idx-1] >= timestamp;
-                    else
-                        return markedAts[idx] >= timestamp;
-                }
-                else
-                {
-                    if (comparator.compare(name, ends[idx]) <= 0)
-                        return markedAts[idx] >= timestamp || cell instanceof CounterCell;
-                    else
-                        idx++;
-                }
-            }
-
-            return false;
-        }
-    }
-
 }