You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/06/30 12:47:58 UTC

[34/51] [partial] cassandra git commit: Storage engine refactor, a.k.a CASSANDRA-8099

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java
new file mode 100644
index 0000000..65b4e3f
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.util.*;
+
+import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.partitions.Partition;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.metrics.ColumnFamilyMetrics.Sampler;
+import org.apache.cassandra.thrift.ThriftResultsMerger;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.memory.HeapAllocator;
+
+/**
+ * General interface for storage-engine read queries.
+ */
+public class SinglePartitionSliceCommand extends SinglePartitionReadCommand<ClusteringIndexSliceFilter>
+{
+    public SinglePartitionSliceCommand(boolean isDigest,
+                                       boolean isForThrift,
+                                       CFMetaData metadata,
+                                       int nowInSec,
+                                       ColumnFilter columnFilter,
+                                       RowFilter rowFilter,
+                                       DataLimits limits,
+                                       DecoratedKey partitionKey,
+                                       ClusteringIndexSliceFilter clusteringIndexFilter)
+    {
+        super(isDigest, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter);
+    }
+
+    public SinglePartitionSliceCommand(CFMetaData metadata,
+                                       int nowInSec,
+                                       ColumnFilter columnFilter,
+                                       RowFilter rowFilter,
+                                       DataLimits limits,
+                                       DecoratedKey partitionKey,
+                                       ClusteringIndexSliceFilter clusteringIndexFilter)
+    {
+        this(false, false, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter);
+    }
+
+    /**
+     * Creates a new single partition slice command for the provided single slice.
+     *
+     * @param metadata the table to query.
+     * @param nowInSec the time in seconds to use are "now" for this query.
+     * @param key the partition key for the partition to query.
+     * @param slice the slice of rows to query.
+     *
+     * @return a newly created read command that queries {@code slice} in {@code key}. The returned query will
+     * query every columns for the table (without limit or row filtering) and be in forward order.
+     */
+    public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, DecoratedKey key, Slice slice)
+    {
+        return create(metadata, nowInSec, key, Slices.with(metadata.comparator, slice));
+    }
+
+    /**
+     * Creates a new single partition slice command for the provided slices.
+     *
+     * @param metadata the table to query.
+     * @param nowInSec the time in seconds to use are "now" for this query.
+     * @param key the partition key for the partition to query.
+     * @param slices the slices of rows to query.
+     *
+     * @return a newly created read command that queries the {@code slices} in {@code key}. The returned query will
+     * query every columns for the table (without limit or row filtering) and be in forward order.
+     */
+    public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, DecoratedKey key, Slices slices)
+    {
+        ClusteringIndexSliceFilter filter = new ClusteringIndexSliceFilter(slices, false);
+        return new SinglePartitionSliceCommand(metadata, nowInSec, ColumnFilter.all(metadata), RowFilter.NONE, DataLimits.NONE, key, filter);
+    }
+
+    public SinglePartitionSliceCommand copy()
+    {
+        return new SinglePartitionSliceCommand(isDigestQuery(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), partitionKey(), clusteringIndexFilter());
+    }
+
+    protected UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs, boolean copyOnHeap)
+    {
+        Tracing.trace("Acquiring sstable references");
+        ColumnFamilyStore.ViewFragment view = cfs.select(cfs.viewFilter(partitionKey()));
+
+        List<UnfilteredRowIterator> iterators = new ArrayList<>(Iterables.size(view.memtables) + view.sstables.size());
+        ClusteringIndexSliceFilter filter = clusteringIndexFilter();
+
+        try
+        {
+            for (Memtable memtable : view.memtables)
+            {
+                Partition partition = memtable.getPartition(partitionKey());
+                if (partition == null)
+                    continue;
+
+                @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator
+                UnfilteredRowIterator iter = filter.getUnfilteredRowIterator(columnFilter(), partition);
+                @SuppressWarnings("resource") // same as above
+                UnfilteredRowIterator maybeCopied = copyOnHeap ? UnfilteredRowIterators.cloningIterator(iter, HeapAllocator.instance) : iter;
+                iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(maybeCopied, nowInSec()) : maybeCopied);
+            }
+
+            /*
+             * We can't eliminate full sstables based on the timestamp of what we've already read like
+             * in collectTimeOrderedData, but we still want to eliminate sstable whose maxTimestamp < mostRecentTombstone
+             * we've read. We still rely on the sstable ordering by maxTimestamp since if
+             *   maxTimestamp_s1 > maxTimestamp_s0,
+             * we're guaranteed that s1 cannot have a row tombstone such that
+             *   timestamp(tombstone) > maxTimestamp_s0
+             * since we necessarily have
+             *   timestamp(tombstone) <= maxTimestamp_s1
+             * In other words, iterating in maxTimestamp order allow to do our mostRecentPartitionTombstone elimination
+             * in one pass, and minimize the number of sstables for which we read a partition tombstone.
+             */
+            int sstablesIterated = 0;
+            Collections.sort(view.sstables, SSTableReader.maxTimestampComparator);
+            List<SSTableReader> skippedSSTables = null;
+            long mostRecentPartitionTombstone = Long.MIN_VALUE;
+            long minTimestamp = Long.MAX_VALUE;
+            int nonIntersectingSSTables = 0;
+
+            for (SSTableReader sstable : view.sstables)
+            {
+                minTimestamp = Math.min(minTimestamp, sstable.getMinTimestamp());
+                // if we've already seen a partition tombstone with a timestamp greater
+                // than the most recent update to this sstable, we can skip it
+                if (sstable.getMaxTimestamp() < mostRecentPartitionTombstone)
+                    break;
+
+                if (!filter.shouldInclude(sstable))
+                {
+                    nonIntersectingSSTables++;
+                    // sstable contains no tombstone if maxLocalDeletionTime == Integer.MAX_VALUE, so we can safely skip those entirely
+                    if (sstable.getSSTableMetadata().maxLocalDeletionTime != Integer.MAX_VALUE)
+                    {
+                        if (skippedSSTables == null)
+                            skippedSSTables = new ArrayList<>();
+                        skippedSSTables.add(sstable);
+                    }
+                    continue;
+                }
+
+                sstable.incrementReadCount();
+                @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator
+                UnfilteredRowIterator iter = filter.filter(sstable.iterator(partitionKey(), columnFilter(), filter.isReversed(), isForThrift()));
+                iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, nowInSec()) : iter);
+                mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone, iter.partitionLevelDeletion().markedForDeleteAt());
+                sstablesIterated++;
+            }
+
+            int includedDueToTombstones = 0;
+            // Check for partition tombstones in the skipped sstables
+            if (skippedSSTables != null)
+            {
+                for (SSTableReader sstable : skippedSSTables)
+                {
+                    if (sstable.getMaxTimestamp() <= minTimestamp)
+                        continue;
+
+                    sstable.incrementReadCount();
+                    @SuppressWarnings("resource") // 'iter' is either closed right away, or added to iterators which is close on exception, or through the closing of the final merged iterator
+                    UnfilteredRowIterator iter = filter.filter(sstable.iterator(partitionKey(), columnFilter(), filter.isReversed(), isForThrift()));
+                    if (iter.partitionLevelDeletion().markedForDeleteAt() > minTimestamp)
+                    {
+                        iterators.add(iter);
+                        includedDueToTombstones++;
+                        sstablesIterated++;
+                    }
+                    else
+                    {
+                        iter.close();
+                    }
+                }
+            }
+            if (Tracing.isTracing())
+                Tracing.trace("Skipped {}/{} non-slice-intersecting sstables, included {} due to tombstones",
+                              nonIntersectingSSTables, view.sstables.size(), includedDueToTombstones);
+
+            cfs.metric.updateSSTableIterated(sstablesIterated);
+
+            if (iterators.isEmpty())
+                return UnfilteredRowIterators.emptyIterator(cfs.metadata, partitionKey(), filter.isReversed());
+
+            Tracing.trace("Merging data from memtables and {} sstables", sstablesIterated);
+
+            @SuppressWarnings("resource") //  Closed through the closing of the result of that method.
+            UnfilteredRowIterator merged = UnfilteredRowIterators.merge(iterators, nowInSec());
+            if (!merged.isEmpty())
+            {
+                DecoratedKey key = merged.partitionKey();
+                cfs.metric.samplers.get(Sampler.READS).addSample(key.getKey(), key.hashCode(), 1);
+            }
+
+            return merged;
+        }
+        catch (RuntimeException | Error e)
+        {
+            try
+            {
+                FBUtilities.closeAll(iterators);
+            }
+            catch (Exception suppressed)
+            {
+                e.addSuppressed(suppressed);
+            }
+            throw e;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/Slice.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Slice.java b/src/java/org/apache/cassandra/db/Slice.java
new file mode 100644
index 0000000..dae491e
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/Slice.java
@@ -0,0 +1,652 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.util.*;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.ObjectSizes;
+
+/**
+ * A slice represents the selection of a range of rows.
+ * <p>
+ * A slice has a start and an end bound that are both (potentially full) clustering prefixes.
+ * A slice selects every rows whose clustering is bigger than the slice start prefix but smaller
+ * than the end prefix. Both start and end can be either inclusive or exclusive.
+ */
+public class Slice
+{
+    public static final Serializer serializer = new Serializer();
+
+    /** The slice selecting all rows (of a given partition) */
+    public static final Slice ALL = new Slice(Bound.BOTTOM, Bound.TOP)
+    {
+        @Override
+        public boolean selects(ClusteringComparator comparator, Clustering clustering)
+        {
+            return true;
+        }
+
+        @Override
+        public boolean intersects(ClusteringComparator comparator, List<ByteBuffer> minClusteringValues, List<ByteBuffer> maxClusteringValues)
+        {
+            return true;
+        }
+
+        @Override
+        public String toString(ClusteringComparator comparator)
+        {
+            return "ALL";
+        }
+    };
+
+    private final Bound start;
+    private final Bound end;
+
+    private Slice(Bound start, Bound end)
+    {
+        assert start.isStart() && end.isEnd();
+        this.start = start.takeAlias();
+        this.end = end.takeAlias();
+    }
+
+    public static Slice make(Bound start, Bound end)
+    {
+        if (start == Bound.BOTTOM && end == Bound.TOP)
+            return ALL;
+
+        return new Slice(start, end);
+    }
+
+    public static Slice make(ClusteringComparator comparator, Object... values)
+    {
+        CBuilder builder = CBuilder.create(comparator);
+        for (int i = 0; i < values.length; i++)
+        {
+            Object val = values[i];
+            if (val instanceof ByteBuffer)
+                builder.add((ByteBuffer)val);
+            else
+                builder.add(val);
+        }
+        return new Slice(builder.buildBound(true, true), builder.buildBound(false, true));
+    }
+
+    public static Slice make(Clustering clustering)
+    {
+        // This doesn't give us what we want with the clustering prefix
+        assert clustering != Clustering.STATIC_CLUSTERING;
+        ByteBuffer[] values = extractValues(clustering);
+        return new Slice(Bound.inclusiveStartOf(values), Bound.inclusiveEndOf(values));
+    }
+
+    public static Slice make(Clustering start, Clustering end)
+    {
+        // This doesn't give us what we want with the clustering prefix
+        assert start != Clustering.STATIC_CLUSTERING && end != Clustering.STATIC_CLUSTERING;
+
+        ByteBuffer[] startValues = extractValues(start);
+        ByteBuffer[] endValues = extractValues(end);
+
+        return new Slice(Bound.inclusiveStartOf(startValues), Bound.inclusiveEndOf(endValues));
+    }
+
+    private static ByteBuffer[] extractValues(ClusteringPrefix clustering)
+    {
+        ByteBuffer[] values = new ByteBuffer[clustering.size()];
+        for (int i = 0; i < clustering.size(); i++)
+            values[i] = clustering.get(i);
+        return values;
+    }
+
+    public Bound start()
+    {
+        return start;
+    }
+
+    public Bound end()
+    {
+        return end;
+    }
+
+    public Bound open(boolean reversed)
+    {
+        return reversed ? end : start;
+    }
+
+    public Bound close(boolean reversed)
+    {
+        return reversed ? start : end;
+    }
+
+    /**
+     * Return whether the slice is empty.
+     *
+     * @param comparator the comparator to compare the bounds.
+     * @return whether the slice formed is empty or not.
+     */
+    public boolean isEmpty(ClusteringComparator comparator)
+    {
+        return isEmpty(comparator, start(), end());
+    }
+
+    /**
+     * Return whether the slice formed by the two provided bound is empty or not.
+     *
+     * @param comparator the comparator to compare the bounds.
+     * @param start the start for the slice to consider. This must be a start bound.
+     * @param end the end for the slice to consider. This must be an end bound.
+     * @return whether the slice formed by {@code start} and {@code end} is
+     * empty or not.
+     */
+    public static boolean isEmpty(ClusteringComparator comparator, Slice.Bound start, Slice.Bound end)
+    {
+        assert start.isStart() && end.isEnd();
+        return comparator.compare(end, start) < 0;
+    }
+
+    /**
+     * Returns whether a given clustering is selected by this slice.
+     *
+     * @param comparator the comparator for the table this is a slice of.
+     * @param clustering the clustering to test inclusion of.
+     *
+     * @return whether {@code clustering} is selected by this slice.
+     */
+    public boolean selects(ClusteringComparator comparator, Clustering clustering)
+    {
+        return comparator.compare(start, clustering) <= 0 && comparator.compare(clustering, end) <= 0;
+    }
+
+    /**
+     * Returns whether a given bound is included in this slice.
+     *
+     * @param comparator the comparator for the table this is a slice of.
+     * @param bound the bound to test inclusion of.
+     *
+     * @return whether {@code bound} is within the bounds of this slice.
+     */
+    public boolean includes(ClusteringComparator comparator, Bound bound)
+    {
+        return comparator.compare(start, bound) <= 0 && comparator.compare(bound, end) <= 0;
+    }
+
+    /**
+     * Returns a slice for continuing paging from the last returned clustering prefix.
+     *
+     * @param comparator the comparator for the table this is a filter for.
+     * @param lastReturned the last clustering that was returned for the query we are paging for. The
+     * resulting slices will be such that only results coming stricly after {@code lastReturned} are returned
+     * (where coming after means "greater than" if {@code !reversed} and "lesser than" otherwise).
+     * @param inclusive whether or not we want to include the {@code lastReturned} in the newly returned page of results.
+     * @param reversed whether the query we're paging for is reversed or not.
+     *
+     * @return a new slice that selects results coming after {@code lastReturned}, or {@code null} if paging
+     * the resulting slice selects nothing (i.e. if it originally selects nothing coming after {@code lastReturned}).
+     */
+    public Slice forPaging(ClusteringComparator comparator, Clustering lastReturned, boolean inclusive, boolean reversed)
+    {
+        if (reversed)
+        {
+            int cmp = comparator.compare(lastReturned, start);
+            if (cmp < 0 || (!inclusive && cmp == 0))
+                return null;
+
+            cmp = comparator.compare(end, lastReturned);
+            if (cmp < 0 || (inclusive && cmp == 0))
+                return this;
+
+            ByteBuffer[] values = extractValues(lastReturned);
+            return new Slice(start, inclusive ? Bound.inclusiveEndOf(values) : Bound.exclusiveEndOf(values));
+        }
+        else
+        {
+            int cmp = comparator.compare(end, lastReturned);
+            if (cmp < 0 || (!inclusive && cmp == 0))
+                return null;
+
+            cmp = comparator.compare(lastReturned, start);
+            if (cmp < 0 || (inclusive && cmp == 0))
+                return this;
+
+            ByteBuffer[] values = extractValues(lastReturned);
+            return new Slice(inclusive ? Bound.inclusiveStartOf(values) : Bound.exclusiveStartOf(values), end);
+        }
+    }
+
+    /**
+     * Given the per-clustering column minimum and maximum value a sstable contains, whether or not this slice potentially
+     * intersects that sstable or not.
+     *
+     * @param comparator the comparator for the table this is a slice of.
+     * @param minClusteringValues the smallest values for each clustering column that a sstable contains.
+     * @param maxClusteringValues the biggest values for each clustering column that a sstable contains.
+     *
+     * @return whether the slice might intersects with the sstable having {@code minClusteringValues} and
+     * {@code maxClusteringValues}.
+     */
+    public boolean intersects(ClusteringComparator comparator, List<ByteBuffer> minClusteringValues, List<ByteBuffer> maxClusteringValues)
+    {
+        // If this slice start after max or end before min, it can't intersect
+        if (start.compareTo(comparator, maxClusteringValues) > 0 || end.compareTo(comparator, minClusteringValues) < 0)
+            return false;
+
+        // We could safely return true here, but there's a minor optimization: if the first component
+        // of the slice is restricted to a single value (typically the slice is [4:5, 4:7]), we can
+        // check that the second component falls within the min/max for that component (and repeat for
+        // all components).
+        for (int j = 0; j < minClusteringValues.size() && j < maxClusteringValues.size(); j++)
+        {
+            ByteBuffer s = j < start.size() ? start.get(j) : null;
+            ByteBuffer f = j < end.size() ? end.get(j) : null;
+
+            // we already know the first component falls within its min/max range (otherwise we wouldn't get here)
+            if (j > 0 && (j < end.size() && comparator.compareComponent(j, f, minClusteringValues.get(j)) < 0 ||
+                        j < start.size() && comparator.compareComponent(j, s, maxClusteringValues.get(j)) > 0))
+                return false;
+
+            // if this component isn't equal in the start and finish, we don't need to check any more
+            if (j >= start.size() || j >= end.size() || comparator.compareComponent(j, s, f) != 0)
+                break;
+        }
+        return true;
+    }
+
+    public String toString(CFMetaData metadata)
+    {
+        return toString(metadata.comparator);
+    }
+
+    public String toString(ClusteringComparator comparator)
+    {
+        StringBuilder sb = new StringBuilder();
+        sb.append(start.isInclusive() ? "[" : "(");
+        for (int i = 0; i < start.size(); i++)
+        {
+            if (i > 0)
+                sb.append(":");
+            sb.append(comparator.subtype(i).getString(start.get(i)));
+        }
+        sb.append(", ");
+        for (int i = 0; i < end.size(); i++)
+        {
+            if (i > 0)
+                sb.append(":");
+            sb.append(comparator.subtype(i).getString(end.get(i)));
+        }
+        sb.append(end.isInclusive() ? "]" : ")");
+        return sb.toString();
+    }
+
+    @Override
+    public boolean equals(Object other)
+    {
+        if(!(other instanceof Slice))
+            return false;
+
+        Slice that = (Slice)other;
+        return this.start().equals(that.start())
+            && this.end().equals(that.end());
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(start(), end());
+    }
+
+    public static class Serializer
+    {
+        public void serialize(Slice slice, DataOutputPlus out, int version, List<AbstractType<?>> types) throws IOException
+        {
+            Bound.serializer.serialize(slice.start, out, version, types);
+            Bound.serializer.serialize(slice.end, out, version, types);
+        }
+
+        public long serializedSize(Slice slice, int version, List<AbstractType<?>> types, TypeSizes sizes)
+        {
+            return Bound.serializer.serializedSize(slice.start, version, types, sizes)
+                 + Bound.serializer.serializedSize(slice.end, version, types, sizes);
+        }
+
+        public Slice deserialize(DataInput in, int version, List<AbstractType<?>> types) throws IOException
+        {
+            Bound start = Bound.serializer.deserialize(in, version, types);
+            Bound end = Bound.serializer.deserialize(in, version, types);
+            return new Slice(start, end);
+        }
+    }
+
+    /**
+     * The bound of a slice.
+     * <p>
+     * This can be either a start or an end bound, and this can be either inclusive or exclusive.
+     */
+    public static class Bound extends AbstractClusteringPrefix
+    {
+        private static final long EMPTY_SIZE = ObjectSizes.measure(new Bound(Kind.INCL_START_BOUND, new ByteBuffer[0]));
+        public static final Serializer serializer = new Serializer();
+
+        /** The smallest start bound, i.e. the one that starts before any row. */
+        public static final Bound BOTTOM = inclusiveStartOf();
+        /** The biggest end bound, i.e. the one that ends after any row. */
+        public static final Bound TOP = inclusiveEndOf();
+
+        protected final Kind kind;
+        protected final ByteBuffer[] values;
+
+        protected Bound(Kind kind, ByteBuffer[] values)
+        {
+            this.kind = kind;
+            this.values = values;
+        }
+
+        public static Bound create(Kind kind, ByteBuffer[] values)
+        {
+            assert !kind.isBoundary();
+            return new Bound(kind, values);
+        }
+
+        public static Kind boundKind(boolean isStart, boolean isInclusive)
+        {
+            return isStart
+                 ? (isInclusive ? Kind.INCL_START_BOUND : Kind.EXCL_START_BOUND)
+                 : (isInclusive ? Kind.INCL_END_BOUND : Kind.EXCL_END_BOUND);
+        }
+
+        public static Bound inclusiveStartOf(ByteBuffer... values)
+        {
+            return create(Kind.INCL_START_BOUND, values);
+        }
+
+        public static Bound inclusiveEndOf(ByteBuffer... values)
+        {
+            return create(Kind.INCL_END_BOUND, values);
+        }
+
+        public static Bound exclusiveStartOf(ByteBuffer... values)
+        {
+            return create(Kind.EXCL_START_BOUND, values);
+        }
+
+        public static Bound exclusiveEndOf(ByteBuffer... values)
+        {
+            return create(Kind.EXCL_END_BOUND, values);
+        }
+
+        public static Bound exclusiveStartOf(ClusteringPrefix prefix)
+        {
+            ByteBuffer[] values = new ByteBuffer[prefix.size()];
+            for (int i = 0; i < prefix.size(); i++)
+                values[i] = prefix.get(i);
+            return exclusiveStartOf(values);
+        }
+
+        public static Bound inclusiveEndOf(ClusteringPrefix prefix)
+        {
+            ByteBuffer[] values = new ByteBuffer[prefix.size()];
+            for (int i = 0; i < prefix.size(); i++)
+                values[i] = prefix.get(i);
+            return inclusiveEndOf(values);
+        }
+
+        public static Bound create(ClusteringComparator comparator, boolean isStart, boolean isInclusive, Object... values)
+        {
+            CBuilder builder = CBuilder.create(comparator);
+            for (int i = 0; i < values.length; i++)
+            {
+                Object val = values[i];
+                if (val instanceof ByteBuffer)
+                    builder.add((ByteBuffer)val);
+                else
+                    builder.add(val);
+            }
+            return builder.buildBound(isStart, isInclusive);
+        }
+
+        public Kind kind()
+        {
+            return kind;
+        }
+
+        public int size()
+        {
+            return values.length;
+        }
+
+        public ByteBuffer get(int i)
+        {
+            return values[i];
+        }
+
+        public Bound withNewKind(Kind kind)
+        {
+            assert !kind.isBoundary();
+            return new Bound(kind, values);
+        }
+
+        public boolean isStart()
+        {
+            return kind().isStart();
+        }
+
+        public boolean isEnd()
+        {
+            return !isStart();
+        }
+
+        public boolean isInclusive()
+        {
+            return kind == Kind.INCL_START_BOUND || kind == Kind.INCL_END_BOUND;
+        }
+
+        public boolean isExclusive()
+        {
+            return kind == Kind.EXCL_START_BOUND || kind == Kind.EXCL_END_BOUND;
+        }
+
+        /**
+         * Returns the inverse of the current bound.
+         * <p>
+         * This invert both start into end (and vice-versa) and inclusive into exclusive (and vice-versa).
+         *
+         * @return the invert of this bound. For instance, if this bound is an exlusive start, this return
+         * an inclusive end with the same values.
+         */
+        public Slice.Bound invert()
+        {
+            return withNewKind(kind().invert());
+        }
+
+        public ByteBuffer[] getRawValues()
+        {
+            return values;
+        }
+
+        public void digest(MessageDigest digest)
+        {
+            for (int i = 0; i < size(); i++)
+                digest.update(get(i).duplicate());
+            FBUtilities.updateWithByte(digest, kind().ordinal());
+        }
+
+        public void writeTo(Slice.Bound.Writer writer)
+        {
+            super.writeTo(writer);
+            writer.writeBoundKind(kind());
+        }
+
+        // For use by intersects, it's called with the sstable bound opposite to the slice bound
+        // (so if the slice bound is a start, it's call with the max sstable bound)
+        private int compareTo(ClusteringComparator comparator, List<ByteBuffer> sstableBound)
+        {
+            for (int i = 0; i < sstableBound.size(); i++)
+            {
+                // Say the slice bound is a start. It means we're in the case where the max
+                // sstable bound is say (1:5) while the slice start is (1). So the start
+                // does start before the sstable end bound (and intersect it). It's the exact
+                // inverse with a end slice bound.
+                if (i >= size())
+                    return isStart() ? -1 : 1;
+
+                int cmp = comparator.compareComponent(i, get(i), sstableBound.get(i));
+                if (cmp != 0)
+                    return cmp;
+            }
+
+            // Say the slice bound is a start. I means we're in the case where the max
+            // sstable bound is say (1), while the slice start is (1:5). This again means
+            // that the slice start before the end bound.
+            if (size() > sstableBound.size())
+                return isStart() ? -1 : 1;
+
+            // The slice bound is equal to the sstable bound. Results depends on whether the slice is inclusive or not
+            return isInclusive() ? 0 : (isStart() ? 1 : -1);
+        }
+
+        public String toString(CFMetaData metadata)
+        {
+            return toString(metadata.comparator);
+        }
+
+        public String toString(ClusteringComparator comparator)
+        {
+            StringBuilder sb = new StringBuilder();
+            sb.append(kind()).append("(");
+            for (int i = 0; i < size(); i++)
+            {
+                if (i > 0)
+                    sb.append(", ");
+                sb.append(comparator.subtype(i).getString(get(i)));
+            }
+            return sb.append(")").toString();
+        }
+
+        // Overriding to get a more precise type
+        @Override
+        public Bound takeAlias()
+        {
+            return this;
+        }
+
+        @Override
+        public long unsharedHeapSize()
+        {
+            return EMPTY_SIZE + ObjectSizes.sizeOnHeapOf(values);
+        }
+
+        public long unsharedHeapSizeExcludingData()
+        {
+            return EMPTY_SIZE + ObjectSizes.sizeOnHeapExcludingData(values);
+        }
+
+        public static Builder builder(int size)
+        {
+            return new Builder(size);
+        }
+
+        public interface Writer extends ClusteringPrefix.Writer
+        {
+            public void writeBoundKind(Kind kind);
+        }
+
+        public static class Builder implements Writer
+        {
+            private final ByteBuffer[] values;
+            private Kind kind;
+            private int idx;
+
+            private Builder(int size)
+            {
+                this.values = new ByteBuffer[size];
+            }
+
+            public void writeClusteringValue(ByteBuffer value)
+            {
+                values[idx++] = value;
+            }
+
+            public void writeBoundKind(Kind kind)
+            {
+                this.kind = kind;
+            }
+
+            public Slice.Bound build()
+            {
+                assert idx == values.length;
+                return Slice.Bound.create(kind, values);
+            }
+        }
+
+        /**
+         * Serializer for slice bounds.
+         * <p>
+         * Contrarily to {@code Clustering}, a slice bound can only be a true prefix of the full clustering, so we actually record
+         * its size.
+         */
+        public static class Serializer
+        {
+            public void serialize(Slice.Bound bound, DataOutputPlus out, int version, List<AbstractType<?>> types) throws IOException
+            {
+                out.writeByte(bound.kind().ordinal());
+                out.writeShort(bound.size());
+                ClusteringPrefix.serializer.serializeValuesWithoutSize(bound, out, version, types);
+            }
+
+            public long serializedSize(Slice.Bound bound, int version, List<AbstractType<?>> types, TypeSizes sizes)
+            {
+                return 1 // kind ordinal
+                     + sizes.sizeof((short)bound.size())
+                     + ClusteringPrefix.serializer.valuesWithoutSizeSerializedSize(bound, version, types, sizes);
+            }
+
+            public Slice.Bound deserialize(DataInput in, int version, List<AbstractType<?>> types) throws IOException
+            {
+                Kind kind = Kind.values()[in.readByte()];
+                return deserializeValues(in, kind, version, types);
+            }
+
+            public Slice.Bound deserializeValues(DataInput in, Kind kind, int version, List<AbstractType<?>> types) throws IOException
+            {
+                int size = in.readUnsignedShort();
+                if (size == 0)
+                    return kind.isStart() ? BOTTOM : TOP;
+
+                Builder builder = builder(size);
+                ClusteringPrefix.serializer.deserializeValuesWithoutSize(in, size, version, types, builder);
+                builder.writeBoundKind(kind);
+                return builder.build();
+            }
+
+            public void deserializeValues(DataInput in, Bound.Kind kind, int version, List<AbstractType<?>> types, Writer writer) throws IOException
+            {
+                int size = in.readUnsignedShort();
+                ClusteringPrefix.serializer.deserializeValuesWithoutSize(in, size, version, types, writer);
+                writer.writeBoundKind(kind);
+            }
+
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java b/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
deleted file mode 100644
index 65eefaa..0000000
--- a/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.io.*;
-import java.nio.ByteBuffer;
-
-import com.google.common.base.Objects;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-public class SliceByNamesReadCommand extends ReadCommand
-{
-    static final SliceByNamesReadCommandSerializer serializer = new SliceByNamesReadCommandSerializer();
-
-    public final NamesQueryFilter filter;
-
-    public SliceByNamesReadCommand(String keyspaceName, ByteBuffer key, String cfName, long timestamp, NamesQueryFilter filter)
-    {
-        super(keyspaceName, key, cfName, timestamp, Type.GET_BY_NAMES);
-        this.filter = filter;
-    }
-
-    public ReadCommand copy()
-    {
-        return new SliceByNamesReadCommand(ksName, key, cfName, timestamp, filter).setIsDigestQuery(isDigestQuery());
-    }
-
-    public Row getRow(Keyspace keyspace)
-    {
-        DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
-        return keyspace.getRow(new QueryFilter(dk, cfName, filter, timestamp));
-    }
-
-    @Override
-    public String toString()
-    {
-        return Objects.toStringHelper(this)
-                      .add("ksName", ksName)
-                      .add("cfName", cfName)
-                      .add("key", ByteBufferUtil.bytesToHex(key))
-                      .add("filter", filter)
-                      .add("timestamp", timestamp)
-                      .toString();
-    }
-
-    public IDiskAtomFilter filter()
-    {
-        return filter;
-    }
-}
-
-class SliceByNamesReadCommandSerializer implements IVersionedSerializer<ReadCommand>
-{
-    public void serialize(ReadCommand cmd, DataOutputPlus out, int version) throws IOException
-    {
-        SliceByNamesReadCommand command = (SliceByNamesReadCommand) cmd;
-        out.writeBoolean(command.isDigestQuery());
-        out.writeUTF(command.ksName);
-        ByteBufferUtil.writeWithShortLength(command.key, out);
-        out.writeUTF(command.cfName);
-        out.writeLong(cmd.timestamp);
-
-        CFMetaData metadata = Schema.instance.getCFMetaData(cmd.ksName, cmd.cfName);
-        metadata.comparator.namesQueryFilterSerializer().serialize(command.filter, out, version);
-    }
-
-    public ReadCommand deserialize(DataInput in, int version) throws IOException
-    {
-        boolean isDigest = in.readBoolean();
-        String keyspaceName = in.readUTF();
-        ByteBuffer key = ByteBufferUtil.readWithShortLength(in);
-        String cfName = in.readUTF();
-        long timestamp = in.readLong();
-        CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, cfName);
-        if (metadata == null)
-        {
-            String message = String.format("Got slice command for nonexistent table %s.%s.  If the table was just " +
-                    "created, this is likely due to the schema not being fully propagated.  Please wait for schema " +
-                    "agreement on table creation.", keyspaceName, cfName);
-            throw new UnknownColumnFamilyException(message, null);
-        }
-        NamesQueryFilter filter = metadata.comparator.namesQueryFilterSerializer().deserialize(in, version);
-        return new SliceByNamesReadCommand(keyspaceName, key, cfName, timestamp, filter).setIsDigestQuery(isDigest);
-    }
-
-    public long serializedSize(ReadCommand cmd, int version)
-    {
-        TypeSizes sizes = TypeSizes.NATIVE;
-        SliceByNamesReadCommand command = (SliceByNamesReadCommand) cmd;
-        int size = sizes.sizeof(command.isDigestQuery());
-        int keySize = command.key.remaining();
-
-        CFMetaData metadata = Schema.instance.getCFMetaData(cmd.ksName, cmd.cfName);
-
-        size += sizes.sizeof(command.ksName);
-        size += sizes.sizeof((short)keySize) + keySize;
-        size += sizes.sizeof(command.cfName);
-        size += sizes.sizeof(cmd.timestamp);
-        size += metadata.comparator.namesQueryFilterSerializer().serializedSize(command.filter, version);
-
-        return size;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
deleted file mode 100644
index 6995193..0000000
--- a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import com.google.common.base.Objects;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.filter.IDiskAtomFilter;
-import org.apache.cassandra.db.filter.QueryFilter;
-import org.apache.cassandra.db.filter.SliceQueryFilter;
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.service.RowDataResolver;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.Pair;
-
-public class SliceFromReadCommand extends ReadCommand
-{
-    private static final Logger logger = LoggerFactory.getLogger(SliceFromReadCommand.class);
-
-    static final SliceFromReadCommandSerializer serializer = new SliceFromReadCommandSerializer();
-
-    public final SliceQueryFilter filter;
-
-    public SliceFromReadCommand(String keyspaceName, ByteBuffer key, String cfName, long timestamp, SliceQueryFilter filter)
-    {
-        super(keyspaceName, key, cfName, timestamp, Type.GET_SLICES);
-        this.filter = filter;
-    }
-
-    public ReadCommand copy()
-    {
-        return new SliceFromReadCommand(ksName, key, cfName, timestamp, filter).setIsDigestQuery(isDigestQuery());
-    }
-
-    public Row getRow(Keyspace keyspace)
-    {
-        CFMetaData cfm = Schema.instance.getCFMetaData(ksName, cfName);
-        DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
-
-        // If we're doing a reversed query and the filter includes static columns, we need to issue two separate
-        // reads in order to guarantee that the static columns are fetched.  See CASSANDRA-8502 for more details.
-        if (filter.reversed && filter.hasStaticSlice(cfm))
-        {
-            logger.debug("Splitting reversed slice with static columns into two reads");
-            Pair<SliceQueryFilter, SliceQueryFilter> newFilters = filter.splitOutStaticSlice(cfm);
-
-            Row normalResults =  keyspace.getRow(new QueryFilter(dk, cfName, newFilters.right, timestamp));
-            Row staticResults =  keyspace.getRow(new QueryFilter(dk, cfName, newFilters.left, timestamp));
-
-            // add the static results to the start of the normal results
-            if (normalResults.cf == null)
-                return staticResults;
-
-            if (staticResults.cf != null)
-                for (Cell cell : staticResults.cf.getReverseSortedColumns())
-                    normalResults.cf.addColumn(cell);
-
-            return normalResults;
-        }
-
-        return keyspace.getRow(new QueryFilter(dk, cfName, filter, timestamp));
-    }
-
-    @Override
-    public ReadCommand maybeGenerateRetryCommand(RowDataResolver resolver, Row row)
-    {
-        int maxLiveColumns = resolver.getMaxLiveCount();
-
-        int count = filter.count;
-        // We generate a retry if at least one node reply with count live columns but after merge we have less
-        // than the total number of column we are interested in (which may be < count on a retry).
-        // So in particular, if no host returned count live columns, we know it's not a short read.
-        if (maxLiveColumns < count)
-            return null;
-
-        int liveCountInRow = row == null || row.cf == null ? 0 : filter.getLiveCount(row.cf, timestamp);
-        if (liveCountInRow < getOriginalRequestedCount())
-        {
-            // We asked t (= count) live columns and got l (=liveCountInRow) ones.
-            // From that, we can estimate that on this row, for x requested
-            // columns, only l/t end up live after reconciliation. So for next
-            // round we want to ask x column so that x * (l/t) == t, i.e. x = t^2/l.
-            int retryCount = liveCountInRow == 0 ? count + 1 : ((count * count) / liveCountInRow) + 1;
-            SliceQueryFilter newFilter = filter.withUpdatedCount(retryCount);
-            return new RetriedSliceFromReadCommand(ksName, key, cfName, timestamp, newFilter, getOriginalRequestedCount());
-        }
-
-        return null;
-    }
-
-    @Override
-    public void maybeTrim(Row row)
-    {
-        if ((row == null) || (row.cf == null))
-            return;
-
-        filter.trim(row.cf, getOriginalRequestedCount(), timestamp);
-    }
-
-    public IDiskAtomFilter filter()
-    {
-        return filter;
-    }
-
-    public SliceFromReadCommand withUpdatedFilter(SliceQueryFilter newFilter)
-    {
-        return new SliceFromReadCommand(ksName, key, cfName, timestamp, newFilter);
-    }
-
-    /**
-     * The original number of columns requested by the user.
-     * This can be different from count when the slice command is a retry (see
-     * RetriedSliceFromReadCommand)
-     */
-    protected int getOriginalRequestedCount()
-    {
-        return filter.count;
-    }
-
-    @Override
-    public String toString()
-    {
-        return Objects.toStringHelper(this)
-                      .add("ksName", ksName)
-                      .add("cfName", cfName)
-                      .add("key", ByteBufferUtil.bytesToHex(key))
-                      .add("filter", filter)
-                      .add("timestamp", timestamp)
-                      .toString();
-    }
-}
-
-class SliceFromReadCommandSerializer implements IVersionedSerializer<ReadCommand>
-{
-    public void serialize(ReadCommand rm, DataOutputPlus out, int version) throws IOException
-    {
-        SliceFromReadCommand realRM = (SliceFromReadCommand)rm;
-        out.writeBoolean(realRM.isDigestQuery());
-        out.writeUTF(realRM.ksName);
-        ByteBufferUtil.writeWithShortLength(realRM.key, out);
-        out.writeUTF(realRM.cfName);
-        out.writeLong(realRM.timestamp);
-        CFMetaData metadata = Schema.instance.getCFMetaData(realRM.ksName, realRM.cfName);
-        metadata.comparator.sliceQueryFilterSerializer().serialize(realRM.filter, out, version);
-    }
-
-    public ReadCommand deserialize(DataInput in, int version) throws IOException
-    {
-        boolean isDigest = in.readBoolean();
-        String keyspaceName = in.readUTF();
-        ByteBuffer key = ByteBufferUtil.readWithShortLength(in);
-        String cfName = in.readUTF();
-        long timestamp = in.readLong();
-        CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, cfName);
-        if (metadata == null)
-        {
-            String message = String.format("Got slice command for nonexistent table %s.%s.  If the table was just " +
-                    "created, this is likely due to the schema not being fully propagated.  Please wait for schema " +
-                    "agreement on table creation.", keyspaceName, cfName);
-            throw new UnknownColumnFamilyException(message, null);
-        }
-        SliceQueryFilter filter = metadata.comparator.sliceQueryFilterSerializer().deserialize(in, version);
-        return new SliceFromReadCommand(keyspaceName, key, cfName, timestamp, filter).setIsDigestQuery(isDigest);
-    }
-
-    public long serializedSize(ReadCommand cmd, int version)
-    {
-        TypeSizes sizes = TypeSizes.NATIVE;
-        SliceFromReadCommand command = (SliceFromReadCommand) cmd;
-        int keySize = command.key.remaining();
-
-        CFMetaData metadata = Schema.instance.getCFMetaData(cmd.ksName, cmd.cfName);
-
-        int size = sizes.sizeof(cmd.isDigestQuery()); // boolean
-        size += sizes.sizeof(command.ksName);
-        size += sizes.sizeof((short) keySize) + keySize;
-        size += sizes.sizeof(command.cfName);
-        size += sizes.sizeof(cmd.timestamp);
-        size += metadata.comparator.sliceQueryFilterSerializer().serializedSize(command.filter, version);
-
-        return size;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/Slices.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Slices.java b/src/java/org/apache/cassandra/db/Slices.java
new file mode 100644
index 0000000..ec7797d
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/Slices.java
@@ -0,0 +1,898 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import com.google.common.collect.Iterators;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+/**
+ * Represents the selection of multiple range of rows within a partition.
+ * <p>
+ * A {@code Slices} is basically a list of {@code Slice}, though those are guaranteed to be non-overlapping
+ * and always in clustering order.
+ */
+public abstract class Slices implements Iterable<Slice>
+{
+    public static final Serializer serializer = new Serializer();
+
+    /** Slices selecting all the rows of a partition. */
+    public static final Slices ALL = new SelectAllSlices();
+    /** Slices selecting no rows in a partition. */
+    public static final Slices NONE = new SelectNoSlices();
+
+    protected Slices()
+    {
+    }
+
+    /**
+     * Creates a {@code Slices} object that contains a single slice.
+     *
+     * @param comparator the comparator for the table {@code slice} is a slice of.
+     * @param slice the single slice that the return object should contains.
+     *
+     * @return the newly created {@code Slices} object.
+     */
+    public static Slices with(ClusteringComparator comparator, Slice slice)
+    {
+        if (slice.start() == Slice.Bound.BOTTOM && slice.end() == Slice.Bound.TOP)
+            return Slices.ALL;
+
+        assert comparator.compare(slice.start(), slice.end()) <= 0;
+        return new ArrayBackedSlices(comparator, new Slice[]{ slice });
+    }
+
+    /**
+     * Whether the slices has a lower bound, that is whether it's first slice start is {@code Slice.BOTTOM}.
+     *
+     * @return whether the slices has a lower bound.
+     */
+    public abstract boolean hasLowerBound();
+
+    /**
+     * Whether the slices has an upper bound, that is whether it's last slice end is {@code Slice.TOP}.
+     *
+     * @return whether the slices has an upper bound.
+     */
+    public abstract boolean hasUpperBound();
+
+    /**
+     * The number of slice this object contains.
+     *
+     * @return the number of slice this object contains.
+     */
+    public abstract int size();
+
+    /**
+     * Returns the ith slice of this {@code Slices} object.
+     *
+     * @return the ith slice of this object.
+     */
+    public abstract Slice get(int i);
+
+    /**
+     * Returns slices for continuing the paging of those slices given the last returned clustering prefix.
+     *
+     * @param comparator the comparator for the table this is a filter for.
+     * @param lastReturned the last clustering that was returned for the query we are paging for. The
+     * resulting slices will be such that only results coming stricly after {@code lastReturned} are returned
+     * (where coming after means "greater than" if {@code !reversed} and "lesser than" otherwise).
+     * @param inclusive whether or not we want to include the {@code lastReturned} in the newly returned page of results.
+     * @param reversed whether the query we're paging for is reversed or not.
+     *
+     * @return new slices that select results coming after {@code lastReturned}.
+     */
+    public abstract Slices forPaging(ClusteringComparator comparator, Clustering lastReturned, boolean inclusive, boolean reversed);
+
+    /**
+     * An object that allows to test whether rows are selected by this {@code Slices} objects assuming those rows
+     * are tested in clustering order.
+     *
+     * @param reversed if true, the rows passed to the returned object will be assumed to be in reversed clustering
+     * order, otherwise they should be in clustering order.
+     *
+     * @return an object that tests for selection of rows by this {@code Slices} object.
+     */
+    public abstract InOrderTester inOrderTester(boolean reversed);
+
+    /**
+     * Whether a given clustering (row) is selected by this {@code Slices} object.
+     *
+     * @param clustering the clustering to test for selection.
+     *
+     * @return whether a given clustering (row) is selected by this {@code Slices} object.
+     */
+    public abstract boolean selects(Clustering clustering);
+
+
+    /**
+     * Given the per-clustering column minimum and maximum value a sstable contains, whether or not this slices potentially
+     * intersects that sstable or not.
+     *
+     * @param minClusteringValues the smallest values for each clustering column that a sstable contains.
+     * @param maxClusteringValues the biggest values for each clustering column that a sstable contains.
+     *
+     * @return whether the slices might intersects with the sstable having {@code minClusteringValues} and
+     * {@code maxClusteringValues}.
+     */
+    public abstract boolean intersects(List<ByteBuffer> minClusteringValues, List<ByteBuffer> maxClusteringValues);
+
+    /**
+     * Given a sliceable row iterator, returns a row iterator that only return rows selected by the slice of
+     * this {@code Slices} object.
+     *
+     * @param iter the sliceable iterator to filter.
+     *
+     * @return an iterator that only returns the rows (or rather Unfiltered) of {@code iter} that are selected by those slices.
+     */
+    public abstract UnfilteredRowIterator makeSliceIterator(SliceableUnfilteredRowIterator iter);
+
+    public abstract String toCQLString(CFMetaData metadata);
+
+    /**
+     * In simple object that allows to test the inclusion of rows in those slices assuming those rows
+     * are passed (to {@link #includes}) in clustering order (or reverse clustering ordered, depending
+     * of the argument passed to {@link #inOrderTester}).
+     */
+    public interface InOrderTester
+    {
+        public boolean includes(Clustering value);
+        public boolean isDone();
+    }
+
+    /**
+     * Builder to create {@code Slices} objects.
+     */
+    public static class Builder
+    {
+        private final ClusteringComparator comparator;
+
+        private final List<Slice> slices;
+
+        private boolean needsNormalizing;
+
+        public Builder(ClusteringComparator comparator)
+        {
+            this.comparator = comparator;
+            this.slices = new ArrayList<>();
+        }
+
+        public Builder(ClusteringComparator comparator, int initialSize)
+        {
+            this.comparator = comparator;
+            this.slices = new ArrayList<>(initialSize);
+        }
+
+        public Builder add(Slice.Bound start, Slice.Bound end)
+        {
+            return add(Slice.make(start, end));
+        }
+
+        public Builder add(Slice slice)
+        {
+            assert comparator.compare(slice.start(), slice.end()) <= 0;
+            if (slices.size() > 0 && comparator.compare(slices.get(slices.size()-1).end(), slice.start()) > 0)
+                needsNormalizing = true;
+            slices.add(slice);
+            return this;
+        }
+
+        public int size()
+        {
+            return slices.size();
+        }
+
+        public Slices build()
+        {
+            if (slices.isEmpty())
+                return NONE;
+
+            if (slices.size() == 1 && slices.get(0) == Slice.ALL)
+                return ALL;
+
+            List<Slice> normalized = needsNormalizing
+                                   ? normalize(slices)
+                                   : slices;
+
+            return new ArrayBackedSlices(comparator, normalized.toArray(new Slice[normalized.size()]));
+        }
+
+        /**
+         * Given an array of slices (potentially overlapping and in any order) and return an equivalent array
+         * of non-overlapping slices in clustering order.
+         *
+         * @param slices an array of slices. This may be modified by this method.
+         * @return the smallest possible array of non-overlapping slices in clustering order. If the original
+         * slices are already non-overlapping and in comparator order, this may or may not return the provided slices
+         * directly.
+         */
+        private List<Slice> normalize(List<Slice> slices)
+        {
+            if (slices.size() <= 1)
+                return slices;
+
+            Collections.sort(slices, new Comparator<Slice>()
+            {
+                @Override
+                public int compare(Slice s1, Slice s2)
+                {
+                    int c = comparator.compare(s1.start(), s2.start());
+                    if (c != 0)
+                        return c;
+
+                    return comparator.compare(s1.end(), s2.end());
+                }
+            });
+
+            List<Slice> slicesCopy = new ArrayList<>(slices.size());
+
+            Slice last = slices.get(0);
+
+            for (int i = 1; i < slices.size(); i++)
+            {
+                Slice s2 = slices.get(i);
+
+                boolean includesStart = last.includes(comparator, s2.start());
+                boolean includesFinish = last.includes(comparator, s2.end());
+
+                if (includesStart && includesFinish)
+                    continue;
+
+                if (!includesStart && !includesFinish)
+                {
+                    slicesCopy.add(last);
+                    last = s2;
+                    continue;
+                }
+
+                if (includesStart)
+                {
+                    last = Slice.make(last.start(), s2.end());
+                    continue;
+                }
+
+                assert !includesFinish;
+            }
+
+            slicesCopy.add(last);
+            return slicesCopy;
+        }
+    }
+
+    public static class Serializer
+    {
+        public void serialize(Slices slices, DataOutputPlus out, int version) throws IOException
+        {
+            int size = slices.size();
+            out.writeInt(size);
+
+            if (size == 0)
+                return;
+
+            List<AbstractType<?>> types = slices == ALL
+                                        ? Collections.<AbstractType<?>>emptyList()
+                                        : ((ArrayBackedSlices)slices).comparator.subtypes();
+
+            for (Slice slice : slices)
+                Slice.serializer.serialize(slice, out, version, types);
+        }
+
+        public long serializedSize(Slices slices, int version, TypeSizes sizes)
+        {
+            long size = sizes.sizeof(slices.size());
+
+            if (slices.size() == 0)
+                return size;
+
+            List<AbstractType<?>> types = slices instanceof SelectAllSlices
+                                        ? Collections.<AbstractType<?>>emptyList()
+                                        : ((ArrayBackedSlices)slices).comparator.subtypes();
+
+            for (Slice slice : slices)
+                size += Slice.serializer.serializedSize(slice, version, types, sizes);
+
+            return size;
+        }
+
+        public Slices deserialize(DataInput in, int version, CFMetaData metadata) throws IOException
+        {
+            int size = in.readInt();
+
+            if (size == 0)
+                return NONE;
+
+            Slice[] slices = new Slice[size];
+            for (int i = 0; i < size; i++)
+                slices[i] = Slice.serializer.deserialize(in, version, metadata.comparator.subtypes());
+
+            if (size == 1 && slices[0].start() == Slice.Bound.BOTTOM && slices[0].end() == Slice.Bound.TOP)
+                return ALL;
+
+            return new ArrayBackedSlices(metadata.comparator, slices);
+        }
+    }
+
+    /**
+     * Simple {@code Slices} implementation that stores its slices in an array.
+     */
+    private static class ArrayBackedSlices extends Slices
+    {
+        private final ClusteringComparator comparator;
+
+        private final Slice[] slices;
+
+        private ArrayBackedSlices(ClusteringComparator comparator, Slice[] slices)
+        {
+            this.comparator = comparator;
+            this.slices = slices;
+        }
+
+        public int size()
+        {
+            return slices.length;
+        }
+
+        public boolean hasLowerBound()
+        {
+            return slices[0].start().size() != 0;
+        }
+
+        public boolean hasUpperBound()
+        {
+            return slices[slices.length - 1].end().size() != 0;
+        }
+
+        public Slice get(int i)
+        {
+            return slices[i];
+        }
+
+        public boolean selects(Clustering clustering)
+        {
+            for (int i = 0; i < slices.length; i++)
+            {
+                Slice slice = slices[i];
+                if (comparator.compare(clustering, slice.start()) < 0)
+                    return false;
+
+                if (comparator.compare(clustering, slice.end()) <= 0)
+                    return true;
+            }
+            return false;
+        }
+
+        public InOrderTester inOrderTester(boolean reversed)
+        {
+            return reversed ? new InReverseOrderTester() : new InForwardOrderTester();
+        }
+
+        public Slices forPaging(ClusteringComparator comparator, Clustering lastReturned, boolean inclusive, boolean reversed)
+        {
+            return reversed ? forReversePaging(comparator, lastReturned, inclusive) : forForwardPaging(comparator, lastReturned, inclusive);
+        }
+
+        private Slices forForwardPaging(ClusteringComparator comparator, Clustering lastReturned, boolean inclusive)
+        {
+            for (int i = 0; i < slices.length; i++)
+            {
+                Slice slice = slices[i];
+                Slice newSlice = slice.forPaging(comparator, lastReturned, inclusive, false);
+                if (newSlice == null)
+                    continue;
+
+                if (slice == newSlice && i == 0)
+                    return this;
+
+                ArrayBackedSlices newSlices = new ArrayBackedSlices(comparator, Arrays.copyOfRange(slices, i, slices.length));
+                newSlices.slices[0] = newSlice;
+                return newSlices;
+            }
+            return Slices.NONE;
+        }
+
+        private Slices forReversePaging(ClusteringComparator comparator, Clustering lastReturned, boolean inclusive)
+        {
+            for (int i = slices.length - 1; i >= 0; i--)
+            {
+                Slice slice = slices[i];
+                Slice newSlice = slice.forPaging(comparator, lastReturned, inclusive, true);
+                if (newSlice == null)
+                    continue;
+
+                if (slice == newSlice && i == slices.length - 1)
+                    return this;
+
+                ArrayBackedSlices newSlices = new ArrayBackedSlices(comparator, Arrays.copyOfRange(slices, 0, i + 1));
+                newSlices.slices[i] = newSlice;
+                return newSlices;
+            }
+            return Slices.NONE;
+        }
+
+        public boolean intersects(List<ByteBuffer> minClusteringValues, List<ByteBuffer> maxClusteringValues)
+        {
+            for (Slice slice : this)
+            {
+                if (slice.intersects(comparator, minClusteringValues, maxClusteringValues))
+                    return true;
+            }
+            return false;
+        }
+
+        public UnfilteredRowIterator makeSliceIterator(final SliceableUnfilteredRowIterator iter)
+        {
+            return new WrappingUnfilteredRowIterator(iter)
+            {
+                private int nextSlice = iter.isReverseOrder() ? slices.length - 1 : 0;
+                private Iterator<Unfiltered> currentSliceIterator = Collections.emptyIterator();
+
+                private Unfiltered next;
+
+                @Override
+                public boolean hasNext()
+                {
+                    prepareNext();
+                    return next != null;
+                }
+
+                @Override
+                public Unfiltered next()
+                {
+                    prepareNext();
+                    Unfiltered toReturn = next;
+                    next = null;
+                    return toReturn;
+                }
+
+                private boolean hasMoreSlice()
+                {
+                    return isReverseOrder()
+                         ? nextSlice >= 0
+                         : nextSlice < slices.length;
+                }
+
+                private Slice popNextSlice()
+                {
+                    return slices[isReverseOrder() ? nextSlice-- : nextSlice++];
+                }
+
+                private void prepareNext()
+                {
+                    if (next != null)
+                        return;
+
+                    while (true)
+                    {
+                        if (currentSliceIterator.hasNext())
+                        {
+                            next = currentSliceIterator.next();
+                            return;
+                        }
+
+                        if (!hasMoreSlice())
+                            return;
+
+                        currentSliceIterator = iter.slice(popNextSlice());
+                    }
+                }
+            };
+        }
+
+        public Iterator<Slice> iterator()
+        {
+            return Iterators.forArray(slices);
+        }
+
+        private class InForwardOrderTester implements InOrderTester
+        {
+            private int idx;
+            private boolean inSlice;
+
+            public boolean includes(Clustering value)
+            {
+                while (idx < slices.length)
+                {
+                    if (!inSlice)
+                    {
+                        int cmp = comparator.compare(value, slices[idx].start());
+                        // value < start
+                        if (cmp < 0)
+                            return false;
+
+                        inSlice = true;
+
+                        if (cmp == 0)
+                            return true;
+                    }
+
+                    // Here, start < value and inSlice
+                    if (comparator.compare(value, slices[idx].end()) <= 0)
+                        return true;
+
+                    ++idx;
+                    inSlice = false;
+                }
+                return false;
+            }
+
+            public boolean isDone()
+            {
+                return idx >= slices.length;
+            }
+        }
+
+        private class InReverseOrderTester implements InOrderTester
+        {
+            private int idx;
+            private boolean inSlice;
+
+            public InReverseOrderTester()
+            {
+                this.idx = slices.length - 1;
+            }
+
+            public boolean includes(Clustering value)
+            {
+                while (idx >= 0)
+                {
+                    if (!inSlice)
+                    {
+                        int cmp = comparator.compare(slices[idx].end(), value);
+                        // value > end
+                        if (cmp > 0)
+                            return false;
+
+                        inSlice = true;
+
+                        if (cmp == 0)
+                            return true;
+                    }
+
+                    // Here, value <= end and inSlice
+                    if (comparator.compare(slices[idx].start(), value) <= 0)
+                        return true;
+
+                    --idx;
+                    inSlice = false;
+                }
+                return false;
+            }
+
+            public boolean isDone()
+            {
+                return idx < 0;
+            }
+        }
+
+        @Override
+        public String toString()
+        {
+            StringBuilder sb = new StringBuilder();
+            sb.append("{");
+            for (int i = 0; i < slices.length; i++)
+            {
+                if (i > 0)
+                    sb.append(", ");
+                sb.append(slices[i].toString(comparator));
+            }
+            return sb.append("}").toString();
+        }
+
+        public String toCQLString(CFMetaData metadata)
+        {
+            StringBuilder sb = new StringBuilder();
+
+            // In CQL, condition are expressed by column, so first group things that way,
+            // i.e. for each column, we create a list of what each slice contains on that column
+            int clusteringSize = metadata.clusteringColumns().size();
+            List<List<ComponentOfSlice>> columnComponents = new ArrayList<>(clusteringSize);
+            for (int i = 0; i < clusteringSize; i++)
+            {
+                List<ComponentOfSlice> perSlice = new ArrayList<>();
+                columnComponents.add(perSlice);
+
+                for (int j = 0; j < slices.length; j++)
+                {
+                    ComponentOfSlice c = ComponentOfSlice.fromSlice(i, slices[j]);
+                    if (c != null)
+                        perSlice.add(c);
+                }
+            }
+
+            boolean needAnd = false;
+            for (int i = 0; i < clusteringSize; i++)
+            {
+                ColumnDefinition column = metadata.clusteringColumns().get(i);
+                List<ComponentOfSlice> componentInfo = columnComponents.get(i);
+                if (componentInfo.isEmpty())
+                    break;
+
+                // For a given column, there is only 3 cases that CQL currently generates:
+                //   1) every slice are EQ with the same value, it's a simple '=' relation.
+                //   2) every slice are EQ but with different values, it's a IN relation.
+                //   3) every slice aren't EQ but have the same values, we have inequality relations.
+                // Note that this doesn't cover everything that ReadCommand can express, but
+                // as it's all that CQL support for now, we'll ignore other cases (which would then
+                // display a bogus query but that's not the end of the world).
+                // TODO: we should improve this at some point.
+                ComponentOfSlice first = componentInfo.get(0);
+                if (first.isEQ())
+                {
+                    if (needAnd)
+                        sb.append(" AND ");
+                    needAnd = true;
+
+                    sb.append(column.name);
+
+                    Set<ByteBuffer> values = new LinkedHashSet<>();
+                    for (int j = 0; j < componentInfo.size(); j++)
+                        values.add(componentInfo.get(j).startValue);
+
+                    if (values.size() == 1)
+                    {
+                        sb.append(" = ").append(column.type.getString(first.startValue));
+                    }
+                    else
+                    {
+                        sb.append(" IN (");
+                        int j = 0;
+                        for (ByteBuffer value : values)
+                            sb.append(j++ == 0 ? "" : ", ").append(column.type.getString(value));
+                        sb.append(")");
+                    }
+                }
+                else
+                {
+                    // As said above, we assume (without checking) that this means all ComponentOfSlice for this column
+                    // are the same, so we only bother about the first.
+                    if (first.startValue != null)
+                    {
+                        if (needAnd)
+                            sb.append(" AND ");
+                        needAnd = true;
+                        sb.append(column.name).append(first.startInclusive ? " >= " : " > ").append(column.type.getString(first.startValue));
+                    }
+                    if (first.endValue != null)
+                    {
+                        if (needAnd)
+                            sb.append(" AND ");
+                        needAnd = true;
+                        sb.append(column.name).append(first.endInclusive ? " <= " : " < ").append(column.type.getString(first.endValue));
+                    }
+                }
+            }
+            return sb.toString();
+        }
+
+        // An somewhat adhoc utility class only used by toCQLString
+        private static class ComponentOfSlice
+        {
+            public final boolean startInclusive;
+            public final ByteBuffer startValue;
+            public final boolean endInclusive;
+            public final ByteBuffer endValue;
+
+            private ComponentOfSlice(boolean startInclusive, ByteBuffer startValue, boolean endInclusive, ByteBuffer endValue)
+            {
+                this.startInclusive = startInclusive;
+                this.startValue = startValue;
+                this.endInclusive = endInclusive;
+                this.endValue = endValue;
+            }
+
+            public static ComponentOfSlice fromSlice(int component, Slice slice)
+            {
+                Slice.Bound start = slice.start();
+                Slice.Bound end = slice.end();
+
+                if (component >= start.size() && component >= end.size())
+                    return null;
+
+                boolean startInclusive = true, endInclusive = true;
+                ByteBuffer startValue = null, endValue = null;
+                if (component < start.size())
+                {
+                    startInclusive = start.isInclusive();
+                    startValue = start.get(component);
+                }
+                if (component < end.size())
+                {
+                    endInclusive = end.isInclusive();
+                    endValue = end.get(component);
+                }
+                return new ComponentOfSlice(startInclusive, startValue, endInclusive, endValue);
+            }
+
+            public boolean isEQ()
+            {
+                return startValue.equals(endValue);
+            }
+        }
+    }
+
+    /**
+     * Specialized implementation of {@code Slices} that selects all rows.
+     * <p>
+     * This is equivalent to having the single {@code Slice.ALL} slice, but is somewhat more effecient.
+     */
+    private static class SelectAllSlices extends Slices
+    {
+        private static final InOrderTester trivialTester = new InOrderTester()
+        {
+            public boolean includes(Clustering value)
+            {
+                return true;
+            }
+
+            public boolean isDone()
+            {
+                return false;
+            }
+        };
+
+        public int size()
+        {
+            return 1;
+        }
+
+        public Slice get(int i)
+        {
+            return Slice.ALL;
+        }
+
+        public boolean hasLowerBound()
+        {
+            return false;
+        }
+
+        public boolean hasUpperBound()
+        {
+            return false;
+        }
+
+        public boolean selects(Clustering clustering)
+        {
+            return true;
+        }
+
+        public Slices forPaging(ClusteringComparator comparator, Clustering lastReturned, boolean inclusive, boolean reversed)
+        {
+            return new ArrayBackedSlices(comparator, new Slice[]{ Slice.ALL.forPaging(comparator, lastReturned, inclusive, reversed) });
+        }
+
+        public InOrderTester inOrderTester(boolean reversed)
+        {
+            return trivialTester;
+        }
+
+        public boolean intersects(List<ByteBuffer> minClusteringValues, List<ByteBuffer> maxClusteringValues)
+        {
+            return true;
+        }
+
+        public UnfilteredRowIterator makeSliceIterator(SliceableUnfilteredRowIterator iter)
+        {
+            return iter;
+        }
+
+        public Iterator<Slice> iterator()
+        {
+            return Iterators.singletonIterator(Slice.ALL);
+        }
+
+        @Override
+        public String toString()
+        {
+            return "ALL";
+        }
+
+        public String toCQLString(CFMetaData metadata)
+        {
+            return "";
+        }
+    }
+
+    /**
+     * Specialized implementation of {@code Slices} that selects no rows.
+     */
+    private static class SelectNoSlices extends Slices
+    {
+        private static final InOrderTester trivialTester = new InOrderTester()
+        {
+            public boolean includes(Clustering value)
+            {
+                return false;
+            }
+
+            public boolean isDone()
+            {
+                return true;
+            }
+        };
+
+        public int size()
+        {
+            return 0;
+        }
+
+        public Slice get(int i)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public boolean hasLowerBound()
+        {
+            return false;
+        }
+
+        public boolean hasUpperBound()
+        {
+            return false;
+        }
+
+        public Slices forPaging(ClusteringComparator comparator, Clustering lastReturned, boolean inclusive, boolean reversed)
+        {
+            return this;
+        }
+
+        public boolean selects(Clustering clustering)
+        {
+            return false;
+        }
+
+        public InOrderTester inOrderTester(boolean reversed)
+        {
+            return trivialTester;
+        }
+
+        public boolean intersects(List<ByteBuffer> minClusteringValues, List<ByteBuffer> maxClusteringValues)
+        {
+            return false;
+        }
+
+        public UnfilteredRowIterator makeSliceIterator(SliceableUnfilteredRowIterator iter)
+        {
+            return UnfilteredRowIterators.emptyIterator(iter.metadata(), iter.partitionKey(), iter.isReverseOrder());
+        }
+
+        public Iterator<Slice> iterator()
+        {
+            return Iterators.emptyIterator();
+        }
+
+        @Override
+        public String toString()
+        {
+            return "NONE";
+        }
+
+        public String toCQLString(CFMetaData metadata)
+        {
+            return "";
+        }
+    }
+}