You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/06/30 12:47:58 UTC
[34/51] [partial] cassandra git commit: Storage engine refactor,
a.k.a CASSANDRA-8099
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java
new file mode 100644
index 0000000..65b4e3f
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.util.*;
+
+import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.partitions.Partition;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.metrics.ColumnFamilyMetrics.Sampler;
+import org.apache.cassandra.thrift.ThriftResultsMerger;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.memory.HeapAllocator;
+
+/**
+ * General interface for storage-engine read queries.
+ */
+public class SinglePartitionSliceCommand extends SinglePartitionReadCommand<ClusteringIndexSliceFilter>
+{
+ public SinglePartitionSliceCommand(boolean isDigest,
+ boolean isForThrift,
+ CFMetaData metadata,
+ int nowInSec,
+ ColumnFilter columnFilter,
+ RowFilter rowFilter,
+ DataLimits limits,
+ DecoratedKey partitionKey,
+ ClusteringIndexSliceFilter clusteringIndexFilter)
+ {
+ super(isDigest, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter);
+ }
+
+ public SinglePartitionSliceCommand(CFMetaData metadata,
+ int nowInSec,
+ ColumnFilter columnFilter,
+ RowFilter rowFilter,
+ DataLimits limits,
+ DecoratedKey partitionKey,
+ ClusteringIndexSliceFilter clusteringIndexFilter)
+ {
+ this(false, false, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter);
+ }
+
+ /**
+ * Creates a new single partition slice command for the provided single slice.
+ *
+ * @param metadata the table to query.
+ * @param nowInSec the time in seconds to use are "now" for this query.
+ * @param key the partition key for the partition to query.
+ * @param slice the slice of rows to query.
+ *
+ * @return a newly created read command that queries {@code slice} in {@code key}. The returned query will
+ * query every columns for the table (without limit or row filtering) and be in forward order.
+ */
+ public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, DecoratedKey key, Slice slice)
+ {
+ return create(metadata, nowInSec, key, Slices.with(metadata.comparator, slice));
+ }
+
+ /**
+ * Creates a new single partition slice command for the provided slices.
+ *
+ * @param metadata the table to query.
+ * @param nowInSec the time in seconds to use are "now" for this query.
+ * @param key the partition key for the partition to query.
+ * @param slices the slices of rows to query.
+ *
+ * @return a newly created read command that queries the {@code slices} in {@code key}. The returned query will
+ * query every columns for the table (without limit or row filtering) and be in forward order.
+ */
+ public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, DecoratedKey key, Slices slices)
+ {
+ ClusteringIndexSliceFilter filter = new ClusteringIndexSliceFilter(slices, false);
+ return new SinglePartitionSliceCommand(metadata, nowInSec, ColumnFilter.all(metadata), RowFilter.NONE, DataLimits.NONE, key, filter);
+ }
+
+ public SinglePartitionSliceCommand copy()
+ {
+ return new SinglePartitionSliceCommand(isDigestQuery(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), partitionKey(), clusteringIndexFilter());
+ }
+
+ protected UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs, boolean copyOnHeap)
+ {
+ Tracing.trace("Acquiring sstable references");
+ ColumnFamilyStore.ViewFragment view = cfs.select(cfs.viewFilter(partitionKey()));
+
+ List<UnfilteredRowIterator> iterators = new ArrayList<>(Iterables.size(view.memtables) + view.sstables.size());
+ ClusteringIndexSliceFilter filter = clusteringIndexFilter();
+
+ try
+ {
+ for (Memtable memtable : view.memtables)
+ {
+ Partition partition = memtable.getPartition(partitionKey());
+ if (partition == null)
+ continue;
+
+ @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator
+ UnfilteredRowIterator iter = filter.getUnfilteredRowIterator(columnFilter(), partition);
+ @SuppressWarnings("resource") // same as above
+ UnfilteredRowIterator maybeCopied = copyOnHeap ? UnfilteredRowIterators.cloningIterator(iter, HeapAllocator.instance) : iter;
+ iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(maybeCopied, nowInSec()) : maybeCopied);
+ }
+
+ /*
+ * We can't eliminate full sstables based on the timestamp of what we've already read like
+ * in collectTimeOrderedData, but we still want to eliminate sstable whose maxTimestamp < mostRecentTombstone
+ * we've read. We still rely on the sstable ordering by maxTimestamp since if
+ * maxTimestamp_s1 > maxTimestamp_s0,
+ * we're guaranteed that s1 cannot have a row tombstone such that
+ * timestamp(tombstone) > maxTimestamp_s0
+ * since we necessarily have
+ * timestamp(tombstone) <= maxTimestamp_s1
+ * In other words, iterating in maxTimestamp order allow to do our mostRecentPartitionTombstone elimination
+ * in one pass, and minimize the number of sstables for which we read a partition tombstone.
+ */
+ int sstablesIterated = 0;
+ Collections.sort(view.sstables, SSTableReader.maxTimestampComparator);
+ List<SSTableReader> skippedSSTables = null;
+ long mostRecentPartitionTombstone = Long.MIN_VALUE;
+ long minTimestamp = Long.MAX_VALUE;
+ int nonIntersectingSSTables = 0;
+
+ for (SSTableReader sstable : view.sstables)
+ {
+ minTimestamp = Math.min(minTimestamp, sstable.getMinTimestamp());
+ // if we've already seen a partition tombstone with a timestamp greater
+ // than the most recent update to this sstable, we can skip it
+ if (sstable.getMaxTimestamp() < mostRecentPartitionTombstone)
+ break;
+
+ if (!filter.shouldInclude(sstable))
+ {
+ nonIntersectingSSTables++;
+ // sstable contains no tombstone if maxLocalDeletionTime == Integer.MAX_VALUE, so we can safely skip those entirely
+ if (sstable.getSSTableMetadata().maxLocalDeletionTime != Integer.MAX_VALUE)
+ {
+ if (skippedSSTables == null)
+ skippedSSTables = new ArrayList<>();
+ skippedSSTables.add(sstable);
+ }
+ continue;
+ }
+
+ sstable.incrementReadCount();
+ @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator
+ UnfilteredRowIterator iter = filter.filter(sstable.iterator(partitionKey(), columnFilter(), filter.isReversed(), isForThrift()));
+ iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, nowInSec()) : iter);
+ mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone, iter.partitionLevelDeletion().markedForDeleteAt());
+ sstablesIterated++;
+ }
+
+ int includedDueToTombstones = 0;
+ // Check for partition tombstones in the skipped sstables
+ if (skippedSSTables != null)
+ {
+ for (SSTableReader sstable : skippedSSTables)
+ {
+ if (sstable.getMaxTimestamp() <= minTimestamp)
+ continue;
+
+ sstable.incrementReadCount();
+ @SuppressWarnings("resource") // 'iter' is either closed right away, or added to iterators which is close on exception, or through the closing of the final merged iterator
+ UnfilteredRowIterator iter = filter.filter(sstable.iterator(partitionKey(), columnFilter(), filter.isReversed(), isForThrift()));
+ if (iter.partitionLevelDeletion().markedForDeleteAt() > minTimestamp)
+ {
+ iterators.add(iter);
+ includedDueToTombstones++;
+ sstablesIterated++;
+ }
+ else
+ {
+ iter.close();
+ }
+ }
+ }
+ if (Tracing.isTracing())
+ Tracing.trace("Skipped {}/{} non-slice-intersecting sstables, included {} due to tombstones",
+ nonIntersectingSSTables, view.sstables.size(), includedDueToTombstones);
+
+ cfs.metric.updateSSTableIterated(sstablesIterated);
+
+ if (iterators.isEmpty())
+ return UnfilteredRowIterators.emptyIterator(cfs.metadata, partitionKey(), filter.isReversed());
+
+ Tracing.trace("Merging data from memtables and {} sstables", sstablesIterated);
+
+ @SuppressWarnings("resource") // Closed through the closing of the result of that method.
+ UnfilteredRowIterator merged = UnfilteredRowIterators.merge(iterators, nowInSec());
+ if (!merged.isEmpty())
+ {
+ DecoratedKey key = merged.partitionKey();
+ cfs.metric.samplers.get(Sampler.READS).addSample(key.getKey(), key.hashCode(), 1);
+ }
+
+ return merged;
+ }
+ catch (RuntimeException | Error e)
+ {
+ try
+ {
+ FBUtilities.closeAll(iterators);
+ }
+ catch (Exception suppressed)
+ {
+ e.addSuppressed(suppressed);
+ }
+ throw e;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/Slice.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Slice.java b/src/java/org/apache/cassandra/db/Slice.java
new file mode 100644
index 0000000..dae491e
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/Slice.java
@@ -0,0 +1,652 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.util.*;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.ObjectSizes;
+
+/**
+ * A slice represents the selection of a range of rows.
+ * <p>
+ * A slice has a start and an end bound that are both (potentially full) clustering prefixes.
+ * A slice selects every rows whose clustering is bigger than the slice start prefix but smaller
+ * than the end prefix. Both start and end can be either inclusive or exclusive.
+ */
+public class Slice
+{
+ public static final Serializer serializer = new Serializer();
+
+ /** The slice selecting all rows (of a given partition) */
+ public static final Slice ALL = new Slice(Bound.BOTTOM, Bound.TOP)
+ {
+ @Override
+ public boolean selects(ClusteringComparator comparator, Clustering clustering)
+ {
+ return true;
+ }
+
+ @Override
+ public boolean intersects(ClusteringComparator comparator, List<ByteBuffer> minClusteringValues, List<ByteBuffer> maxClusteringValues)
+ {
+ return true;
+ }
+
+ @Override
+ public String toString(ClusteringComparator comparator)
+ {
+ return "ALL";
+ }
+ };
+
+ private final Bound start;
+ private final Bound end;
+
+ private Slice(Bound start, Bound end)
+ {
+ assert start.isStart() && end.isEnd();
+ this.start = start.takeAlias();
+ this.end = end.takeAlias();
+ }
+
+ public static Slice make(Bound start, Bound end)
+ {
+ if (start == Bound.BOTTOM && end == Bound.TOP)
+ return ALL;
+
+ return new Slice(start, end);
+ }
+
+ public static Slice make(ClusteringComparator comparator, Object... values)
+ {
+ CBuilder builder = CBuilder.create(comparator);
+ for (int i = 0; i < values.length; i++)
+ {
+ Object val = values[i];
+ if (val instanceof ByteBuffer)
+ builder.add((ByteBuffer)val);
+ else
+ builder.add(val);
+ }
+ return new Slice(builder.buildBound(true, true), builder.buildBound(false, true));
+ }
+
+ public static Slice make(Clustering clustering)
+ {
+ // This doesn't give us what we want with the clustering prefix
+ assert clustering != Clustering.STATIC_CLUSTERING;
+ ByteBuffer[] values = extractValues(clustering);
+ return new Slice(Bound.inclusiveStartOf(values), Bound.inclusiveEndOf(values));
+ }
+
+ public static Slice make(Clustering start, Clustering end)
+ {
+ // This doesn't give us what we want with the clustering prefix
+ assert start != Clustering.STATIC_CLUSTERING && end != Clustering.STATIC_CLUSTERING;
+
+ ByteBuffer[] startValues = extractValues(start);
+ ByteBuffer[] endValues = extractValues(end);
+
+ return new Slice(Bound.inclusiveStartOf(startValues), Bound.inclusiveEndOf(endValues));
+ }
+
+ private static ByteBuffer[] extractValues(ClusteringPrefix clustering)
+ {
+ ByteBuffer[] values = new ByteBuffer[clustering.size()];
+ for (int i = 0; i < clustering.size(); i++)
+ values[i] = clustering.get(i);
+ return values;
+ }
+
+ public Bound start()
+ {
+ return start;
+ }
+
+ public Bound end()
+ {
+ return end;
+ }
+
+ public Bound open(boolean reversed)
+ {
+ return reversed ? end : start;
+ }
+
+ public Bound close(boolean reversed)
+ {
+ return reversed ? start : end;
+ }
+
+ /**
+ * Return whether the slice is empty.
+ *
+ * @param comparator the comparator to compare the bounds.
+ * @return whether the slice formed is empty or not.
+ */
+ public boolean isEmpty(ClusteringComparator comparator)
+ {
+ return isEmpty(comparator, start(), end());
+ }
+
+ /**
+ * Return whether the slice formed by the two provided bound is empty or not.
+ *
+ * @param comparator the comparator to compare the bounds.
+ * @param start the start for the slice to consider. This must be a start bound.
+ * @param end the end for the slice to consider. This must be an end bound.
+ * @return whether the slice formed by {@code start} and {@code end} is
+ * empty or not.
+ */
+ public static boolean isEmpty(ClusteringComparator comparator, Slice.Bound start, Slice.Bound end)
+ {
+ assert start.isStart() && end.isEnd();
+ return comparator.compare(end, start) < 0;
+ }
+
+ /**
+ * Returns whether a given clustering is selected by this slice.
+ *
+ * @param comparator the comparator for the table this is a slice of.
+ * @param clustering the clustering to test inclusion of.
+ *
+ * @return whether {@code clustering} is selected by this slice.
+ */
+ public boolean selects(ClusteringComparator comparator, Clustering clustering)
+ {
+ return comparator.compare(start, clustering) <= 0 && comparator.compare(clustering, end) <= 0;
+ }
+
+ /**
+ * Returns whether a given bound is included in this slice.
+ *
+ * @param comparator the comparator for the table this is a slice of.
+ * @param bound the bound to test inclusion of.
+ *
+ * @return whether {@code bound} is within the bounds of this slice.
+ */
+ public boolean includes(ClusteringComparator comparator, Bound bound)
+ {
+ return comparator.compare(start, bound) <= 0 && comparator.compare(bound, end) <= 0;
+ }
+
+ /**
+ * Returns a slice for continuing paging from the last returned clustering prefix.
+ *
+ * @param comparator the comparator for the table this is a filter for.
+ * @param lastReturned the last clustering that was returned for the query we are paging for. The
+ * resulting slices will be such that only results coming stricly after {@code lastReturned} are returned
+ * (where coming after means "greater than" if {@code !reversed} and "lesser than" otherwise).
+ * @param inclusive whether or not we want to include the {@code lastReturned} in the newly returned page of results.
+ * @param reversed whether the query we're paging for is reversed or not.
+ *
+ * @return a new slice that selects results coming after {@code lastReturned}, or {@code null} if paging
+ * the resulting slice selects nothing (i.e. if it originally selects nothing coming after {@code lastReturned}).
+ */
+ public Slice forPaging(ClusteringComparator comparator, Clustering lastReturned, boolean inclusive, boolean reversed)
+ {
+ if (reversed)
+ {
+ int cmp = comparator.compare(lastReturned, start);
+ if (cmp < 0 || (!inclusive && cmp == 0))
+ return null;
+
+ cmp = comparator.compare(end, lastReturned);
+ if (cmp < 0 || (inclusive && cmp == 0))
+ return this;
+
+ ByteBuffer[] values = extractValues(lastReturned);
+ return new Slice(start, inclusive ? Bound.inclusiveEndOf(values) : Bound.exclusiveEndOf(values));
+ }
+ else
+ {
+ int cmp = comparator.compare(end, lastReturned);
+ if (cmp < 0 || (!inclusive && cmp == 0))
+ return null;
+
+ cmp = comparator.compare(lastReturned, start);
+ if (cmp < 0 || (inclusive && cmp == 0))
+ return this;
+
+ ByteBuffer[] values = extractValues(lastReturned);
+ return new Slice(inclusive ? Bound.inclusiveStartOf(values) : Bound.exclusiveStartOf(values), end);
+ }
+ }
+
+ /**
+ * Given the per-clustering column minimum and maximum value a sstable contains, whether or not this slice potentially
+ * intersects that sstable or not.
+ *
+ * @param comparator the comparator for the table this is a slice of.
+ * @param minClusteringValues the smallest values for each clustering column that a sstable contains.
+ * @param maxClusteringValues the biggest values for each clustering column that a sstable contains.
+ *
+ * @return whether the slice might intersects with the sstable having {@code minClusteringValues} and
+ * {@code maxClusteringValues}.
+ */
+ public boolean intersects(ClusteringComparator comparator, List<ByteBuffer> minClusteringValues, List<ByteBuffer> maxClusteringValues)
+ {
+ // If this slice start after max or end before min, it can't intersect
+ if (start.compareTo(comparator, maxClusteringValues) > 0 || end.compareTo(comparator, minClusteringValues) < 0)
+ return false;
+
+ // We could safely return true here, but there's a minor optimization: if the first component
+ // of the slice is restricted to a single value (typically the slice is [4:5, 4:7]), we can
+ // check that the second component falls within the min/max for that component (and repeat for
+ // all components).
+ for (int j = 0; j < minClusteringValues.size() && j < maxClusteringValues.size(); j++)
+ {
+ ByteBuffer s = j < start.size() ? start.get(j) : null;
+ ByteBuffer f = j < end.size() ? end.get(j) : null;
+
+ // we already know the first component falls within its min/max range (otherwise we wouldn't get here)
+ if (j > 0 && (j < end.size() && comparator.compareComponent(j, f, minClusteringValues.get(j)) < 0 ||
+ j < start.size() && comparator.compareComponent(j, s, maxClusteringValues.get(j)) > 0))
+ return false;
+
+ // if this component isn't equal in the start and finish, we don't need to check any more
+ if (j >= start.size() || j >= end.size() || comparator.compareComponent(j, s, f) != 0)
+ break;
+ }
+ return true;
+ }
+
+ public String toString(CFMetaData metadata)
+ {
+ return toString(metadata.comparator);
+ }
+
+ public String toString(ClusteringComparator comparator)
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append(start.isInclusive() ? "[" : "(");
+ for (int i = 0; i < start.size(); i++)
+ {
+ if (i > 0)
+ sb.append(":");
+ sb.append(comparator.subtype(i).getString(start.get(i)));
+ }
+ sb.append(", ");
+ for (int i = 0; i < end.size(); i++)
+ {
+ if (i > 0)
+ sb.append(":");
+ sb.append(comparator.subtype(i).getString(end.get(i)));
+ }
+ sb.append(end.isInclusive() ? "]" : ")");
+ return sb.toString();
+ }
+
+ @Override
+ public boolean equals(Object other)
+ {
+ if(!(other instanceof Slice))
+ return false;
+
+ Slice that = (Slice)other;
+ return this.start().equals(that.start())
+ && this.end().equals(that.end());
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(start(), end());
+ }
+
+ public static class Serializer
+ {
+ public void serialize(Slice slice, DataOutputPlus out, int version, List<AbstractType<?>> types) throws IOException
+ {
+ Bound.serializer.serialize(slice.start, out, version, types);
+ Bound.serializer.serialize(slice.end, out, version, types);
+ }
+
+ public long serializedSize(Slice slice, int version, List<AbstractType<?>> types, TypeSizes sizes)
+ {
+ return Bound.serializer.serializedSize(slice.start, version, types, sizes)
+ + Bound.serializer.serializedSize(slice.end, version, types, sizes);
+ }
+
+ public Slice deserialize(DataInput in, int version, List<AbstractType<?>> types) throws IOException
+ {
+ Bound start = Bound.serializer.deserialize(in, version, types);
+ Bound end = Bound.serializer.deserialize(in, version, types);
+ return new Slice(start, end);
+ }
+ }
+
+ /**
+ * The bound of a slice.
+ * <p>
+ * This can be either a start or an end bound, and this can be either inclusive or exclusive.
+ */
+ public static class Bound extends AbstractClusteringPrefix
+ {
+ private static final long EMPTY_SIZE = ObjectSizes.measure(new Bound(Kind.INCL_START_BOUND, new ByteBuffer[0]));
+ public static final Serializer serializer = new Serializer();
+
+ /** The smallest start bound, i.e. the one that starts before any row. */
+ public static final Bound BOTTOM = inclusiveStartOf();
+ /** The biggest end bound, i.e. the one that ends after any row. */
+ public static final Bound TOP = inclusiveEndOf();
+
+ protected final Kind kind;
+ protected final ByteBuffer[] values;
+
+ protected Bound(Kind kind, ByteBuffer[] values)
+ {
+ this.kind = kind;
+ this.values = values;
+ }
+
+ public static Bound create(Kind kind, ByteBuffer[] values)
+ {
+ assert !kind.isBoundary();
+ return new Bound(kind, values);
+ }
+
+ public static Kind boundKind(boolean isStart, boolean isInclusive)
+ {
+ return isStart
+ ? (isInclusive ? Kind.INCL_START_BOUND : Kind.EXCL_START_BOUND)
+ : (isInclusive ? Kind.INCL_END_BOUND : Kind.EXCL_END_BOUND);
+ }
+
+ public static Bound inclusiveStartOf(ByteBuffer... values)
+ {
+ return create(Kind.INCL_START_BOUND, values);
+ }
+
+ public static Bound inclusiveEndOf(ByteBuffer... values)
+ {
+ return create(Kind.INCL_END_BOUND, values);
+ }
+
+ public static Bound exclusiveStartOf(ByteBuffer... values)
+ {
+ return create(Kind.EXCL_START_BOUND, values);
+ }
+
+ public static Bound exclusiveEndOf(ByteBuffer... values)
+ {
+ return create(Kind.EXCL_END_BOUND, values);
+ }
+
+ public static Bound exclusiveStartOf(ClusteringPrefix prefix)
+ {
+ ByteBuffer[] values = new ByteBuffer[prefix.size()];
+ for (int i = 0; i < prefix.size(); i++)
+ values[i] = prefix.get(i);
+ return exclusiveStartOf(values);
+ }
+
+ public static Bound inclusiveEndOf(ClusteringPrefix prefix)
+ {
+ ByteBuffer[] values = new ByteBuffer[prefix.size()];
+ for (int i = 0; i < prefix.size(); i++)
+ values[i] = prefix.get(i);
+ return inclusiveEndOf(values);
+ }
+
+ public static Bound create(ClusteringComparator comparator, boolean isStart, boolean isInclusive, Object... values)
+ {
+ CBuilder builder = CBuilder.create(comparator);
+ for (int i = 0; i < values.length; i++)
+ {
+ Object val = values[i];
+ if (val instanceof ByteBuffer)
+ builder.add((ByteBuffer)val);
+ else
+ builder.add(val);
+ }
+ return builder.buildBound(isStart, isInclusive);
+ }
+
+ public Kind kind()
+ {
+ return kind;
+ }
+
+ public int size()
+ {
+ return values.length;
+ }
+
+ public ByteBuffer get(int i)
+ {
+ return values[i];
+ }
+
+ public Bound withNewKind(Kind kind)
+ {
+ assert !kind.isBoundary();
+ return new Bound(kind, values);
+ }
+
+ public boolean isStart()
+ {
+ return kind().isStart();
+ }
+
+ public boolean isEnd()
+ {
+ return !isStart();
+ }
+
+ public boolean isInclusive()
+ {
+ return kind == Kind.INCL_START_BOUND || kind == Kind.INCL_END_BOUND;
+ }
+
+ public boolean isExclusive()
+ {
+ return kind == Kind.EXCL_START_BOUND || kind == Kind.EXCL_END_BOUND;
+ }
+
+ /**
+ * Returns the inverse of the current bound.
+ * <p>
+ * This invert both start into end (and vice-versa) and inclusive into exclusive (and vice-versa).
+ *
+ * @return the invert of this bound. For instance, if this bound is an exlusive start, this return
+ * an inclusive end with the same values.
+ */
+ public Slice.Bound invert()
+ {
+ return withNewKind(kind().invert());
+ }
+
+ public ByteBuffer[] getRawValues()
+ {
+ return values;
+ }
+
+ public void digest(MessageDigest digest)
+ {
+ for (int i = 0; i < size(); i++)
+ digest.update(get(i).duplicate());
+ FBUtilities.updateWithByte(digest, kind().ordinal());
+ }
+
+ public void writeTo(Slice.Bound.Writer writer)
+ {
+ super.writeTo(writer);
+ writer.writeBoundKind(kind());
+ }
+
+ // For use by intersects, it's called with the sstable bound opposite to the slice bound
+ // (so if the slice bound is a start, it's call with the max sstable bound)
+ private int compareTo(ClusteringComparator comparator, List<ByteBuffer> sstableBound)
+ {
+ for (int i = 0; i < sstableBound.size(); i++)
+ {
+ // Say the slice bound is a start. It means we're in the case where the max
+ // sstable bound is say (1:5) while the slice start is (1). So the start
+ // does start before the sstable end bound (and intersect it). It's the exact
+ // inverse with a end slice bound.
+ if (i >= size())
+ return isStart() ? -1 : 1;
+
+ int cmp = comparator.compareComponent(i, get(i), sstableBound.get(i));
+ if (cmp != 0)
+ return cmp;
+ }
+
+ // Say the slice bound is a start. I means we're in the case where the max
+ // sstable bound is say (1), while the slice start is (1:5). This again means
+ // that the slice start before the end bound.
+ if (size() > sstableBound.size())
+ return isStart() ? -1 : 1;
+
+ // The slice bound is equal to the sstable bound. Results depends on whether the slice is inclusive or not
+ return isInclusive() ? 0 : (isStart() ? 1 : -1);
+ }
+
+ public String toString(CFMetaData metadata)
+ {
+ return toString(metadata.comparator);
+ }
+
+ public String toString(ClusteringComparator comparator)
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append(kind()).append("(");
+ for (int i = 0; i < size(); i++)
+ {
+ if (i > 0)
+ sb.append(", ");
+ sb.append(comparator.subtype(i).getString(get(i)));
+ }
+ return sb.append(")").toString();
+ }
+
+ // Overriding to get a more precise type
+ @Override
+ public Bound takeAlias()
+ {
+ return this;
+ }
+
+ @Override
+ public long unsharedHeapSize()
+ {
+ return EMPTY_SIZE + ObjectSizes.sizeOnHeapOf(values);
+ }
+
+ public long unsharedHeapSizeExcludingData()
+ {
+ return EMPTY_SIZE + ObjectSizes.sizeOnHeapExcludingData(values);
+ }
+
+ public static Builder builder(int size)
+ {
+ return new Builder(size);
+ }
+
+ public interface Writer extends ClusteringPrefix.Writer
+ {
+ public void writeBoundKind(Kind kind);
+ }
+
+ public static class Builder implements Writer
+ {
+ private final ByteBuffer[] values;
+ private Kind kind;
+ private int idx;
+
+ private Builder(int size)
+ {
+ this.values = new ByteBuffer[size];
+ }
+
+ public void writeClusteringValue(ByteBuffer value)
+ {
+ values[idx++] = value;
+ }
+
+ public void writeBoundKind(Kind kind)
+ {
+ this.kind = kind;
+ }
+
+ public Slice.Bound build()
+ {
+ assert idx == values.length;
+ return Slice.Bound.create(kind, values);
+ }
+ }
+
+ /**
+ * Serializer for slice bounds.
+ * <p>
+ * Contrarily to {@code Clustering}, a slice bound can only be a true prefix of the full clustering, so we actually record
+ * its size.
+ */
+ public static class Serializer
+ {
+ public void serialize(Slice.Bound bound, DataOutputPlus out, int version, List<AbstractType<?>> types) throws IOException
+ {
+ out.writeByte(bound.kind().ordinal());
+ out.writeShort(bound.size());
+ ClusteringPrefix.serializer.serializeValuesWithoutSize(bound, out, version, types);
+ }
+
+ public long serializedSize(Slice.Bound bound, int version, List<AbstractType<?>> types, TypeSizes sizes)
+ {
+ return 1 // kind ordinal
+ + sizes.sizeof((short)bound.size())
+ + ClusteringPrefix.serializer.valuesWithoutSizeSerializedSize(bound, version, types, sizes);
+ }
+
+ public Slice.Bound deserialize(DataInput in, int version, List<AbstractType<?>> types) throws IOException
+ {
+ Kind kind = Kind.values()[in.readByte()];
+ return deserializeValues(in, kind, version, types);
+ }
+
+ public Slice.Bound deserializeValues(DataInput in, Kind kind, int version, List<AbstractType<?>> types) throws IOException
+ {
+ int size = in.readUnsignedShort();
+ if (size == 0)
+ return kind.isStart() ? BOTTOM : TOP;
+
+ Builder builder = builder(size);
+ ClusteringPrefix.serializer.deserializeValuesWithoutSize(in, size, version, types, builder);
+ builder.writeBoundKind(kind);
+ return builder.build();
+ }
+
+ public void deserializeValues(DataInput in, Bound.Kind kind, int version, List<AbstractType<?>> types, Writer writer) throws IOException
+ {
+ int size = in.readUnsignedShort();
+ ClusteringPrefix.serializer.deserializeValuesWithoutSize(in, size, version, types, writer);
+ writer.writeBoundKind(kind);
+ }
+
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java b/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
deleted file mode 100644
index 65eefaa..0000000
--- a/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.io.*;
-import java.nio.ByteBuffer;
-
-import com.google.common.base.Objects;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-public class SliceByNamesReadCommand extends ReadCommand
-{
- static final SliceByNamesReadCommandSerializer serializer = new SliceByNamesReadCommandSerializer();
-
- public final NamesQueryFilter filter;
-
- public SliceByNamesReadCommand(String keyspaceName, ByteBuffer key, String cfName, long timestamp, NamesQueryFilter filter)
- {
- super(keyspaceName, key, cfName, timestamp, Type.GET_BY_NAMES);
- this.filter = filter;
- }
-
- public ReadCommand copy()
- {
- return new SliceByNamesReadCommand(ksName, key, cfName, timestamp, filter).setIsDigestQuery(isDigestQuery());
- }
-
- public Row getRow(Keyspace keyspace)
- {
- DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
- return keyspace.getRow(new QueryFilter(dk, cfName, filter, timestamp));
- }
-
- @Override
- public String toString()
- {
- return Objects.toStringHelper(this)
- .add("ksName", ksName)
- .add("cfName", cfName)
- .add("key", ByteBufferUtil.bytesToHex(key))
- .add("filter", filter)
- .add("timestamp", timestamp)
- .toString();
- }
-
- public IDiskAtomFilter filter()
- {
- return filter;
- }
-}
-
-class SliceByNamesReadCommandSerializer implements IVersionedSerializer<ReadCommand>
-{
- public void serialize(ReadCommand cmd, DataOutputPlus out, int version) throws IOException
- {
- SliceByNamesReadCommand command = (SliceByNamesReadCommand) cmd;
- out.writeBoolean(command.isDigestQuery());
- out.writeUTF(command.ksName);
- ByteBufferUtil.writeWithShortLength(command.key, out);
- out.writeUTF(command.cfName);
- out.writeLong(cmd.timestamp);
-
- CFMetaData metadata = Schema.instance.getCFMetaData(cmd.ksName, cmd.cfName);
- metadata.comparator.namesQueryFilterSerializer().serialize(command.filter, out, version);
- }
-
- public ReadCommand deserialize(DataInput in, int version) throws IOException
- {
- boolean isDigest = in.readBoolean();
- String keyspaceName = in.readUTF();
- ByteBuffer key = ByteBufferUtil.readWithShortLength(in);
- String cfName = in.readUTF();
- long timestamp = in.readLong();
- CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, cfName);
- if (metadata == null)
- {
- String message = String.format("Got slice command for nonexistent table %s.%s. If the table was just " +
- "created, this is likely due to the schema not being fully propagated. Please wait for schema " +
- "agreement on table creation.", keyspaceName, cfName);
- throw new UnknownColumnFamilyException(message, null);
- }
- NamesQueryFilter filter = metadata.comparator.namesQueryFilterSerializer().deserialize(in, version);
- return new SliceByNamesReadCommand(keyspaceName, key, cfName, timestamp, filter).setIsDigestQuery(isDigest);
- }
-
- public long serializedSize(ReadCommand cmd, int version)
- {
- TypeSizes sizes = TypeSizes.NATIVE;
- SliceByNamesReadCommand command = (SliceByNamesReadCommand) cmd;
- int size = sizes.sizeof(command.isDigestQuery());
- int keySize = command.key.remaining();
-
- CFMetaData metadata = Schema.instance.getCFMetaData(cmd.ksName, cmd.cfName);
-
- size += sizes.sizeof(command.ksName);
- size += sizes.sizeof((short)keySize) + keySize;
- size += sizes.sizeof(command.cfName);
- size += sizes.sizeof(cmd.timestamp);
- size += metadata.comparator.namesQueryFilterSerializer().serializedSize(command.filter, version);
-
- return size;
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
deleted file mode 100644
index 6995193..0000000
--- a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import com.google.common.base.Objects;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.filter.IDiskAtomFilter;
-import org.apache.cassandra.db.filter.QueryFilter;
-import org.apache.cassandra.db.filter.SliceQueryFilter;
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.service.RowDataResolver;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.Pair;
-
-public class SliceFromReadCommand extends ReadCommand
-{
- private static final Logger logger = LoggerFactory.getLogger(SliceFromReadCommand.class);
-
- static final SliceFromReadCommandSerializer serializer = new SliceFromReadCommandSerializer();
-
- public final SliceQueryFilter filter;
-
- public SliceFromReadCommand(String keyspaceName, ByteBuffer key, String cfName, long timestamp, SliceQueryFilter filter)
- {
- super(keyspaceName, key, cfName, timestamp, Type.GET_SLICES);
- this.filter = filter;
- }
-
- public ReadCommand copy()
- {
- return new SliceFromReadCommand(ksName, key, cfName, timestamp, filter).setIsDigestQuery(isDigestQuery());
- }
-
- public Row getRow(Keyspace keyspace)
- {
- CFMetaData cfm = Schema.instance.getCFMetaData(ksName, cfName);
- DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
-
- // If we're doing a reversed query and the filter includes static columns, we need to issue two separate
- // reads in order to guarantee that the static columns are fetched. See CASSANDRA-8502 for more details.
- if (filter.reversed && filter.hasStaticSlice(cfm))
- {
- logger.debug("Splitting reversed slice with static columns into two reads");
- Pair<SliceQueryFilter, SliceQueryFilter> newFilters = filter.splitOutStaticSlice(cfm);
-
- Row normalResults = keyspace.getRow(new QueryFilter(dk, cfName, newFilters.right, timestamp));
- Row staticResults = keyspace.getRow(new QueryFilter(dk, cfName, newFilters.left, timestamp));
-
- // add the static results to the start of the normal results
- if (normalResults.cf == null)
- return staticResults;
-
- if (staticResults.cf != null)
- for (Cell cell : staticResults.cf.getReverseSortedColumns())
- normalResults.cf.addColumn(cell);
-
- return normalResults;
- }
-
- return keyspace.getRow(new QueryFilter(dk, cfName, filter, timestamp));
- }
-
- @Override
- public ReadCommand maybeGenerateRetryCommand(RowDataResolver resolver, Row row)
- {
- int maxLiveColumns = resolver.getMaxLiveCount();
-
- int count = filter.count;
- // We generate a retry if at least one node reply with count live columns but after merge we have less
- // than the total number of column we are interested in (which may be < count on a retry).
- // So in particular, if no host returned count live columns, we know it's not a short read.
- if (maxLiveColumns < count)
- return null;
-
- int liveCountInRow = row == null || row.cf == null ? 0 : filter.getLiveCount(row.cf, timestamp);
- if (liveCountInRow < getOriginalRequestedCount())
- {
- // We asked t (= count) live columns and got l (=liveCountInRow) ones.
- // From that, we can estimate that on this row, for x requested
- // columns, only l/t end up live after reconciliation. So for next
- // round we want to ask x column so that x * (l/t) == t, i.e. x = t^2/l.
- int retryCount = liveCountInRow == 0 ? count + 1 : ((count * count) / liveCountInRow) + 1;
- SliceQueryFilter newFilter = filter.withUpdatedCount(retryCount);
- return new RetriedSliceFromReadCommand(ksName, key, cfName, timestamp, newFilter, getOriginalRequestedCount());
- }
-
- return null;
- }
-
- @Override
- public void maybeTrim(Row row)
- {
- if ((row == null) || (row.cf == null))
- return;
-
- filter.trim(row.cf, getOriginalRequestedCount(), timestamp);
- }
-
- public IDiskAtomFilter filter()
- {
- return filter;
- }
-
- public SliceFromReadCommand withUpdatedFilter(SliceQueryFilter newFilter)
- {
- return new SliceFromReadCommand(ksName, key, cfName, timestamp, newFilter);
- }
-
- /**
- * The original number of columns requested by the user.
- * This can be different from count when the slice command is a retry (see
- * RetriedSliceFromReadCommand)
- */
- protected int getOriginalRequestedCount()
- {
- return filter.count;
- }
-
- @Override
- public String toString()
- {
- return Objects.toStringHelper(this)
- .add("ksName", ksName)
- .add("cfName", cfName)
- .add("key", ByteBufferUtil.bytesToHex(key))
- .add("filter", filter)
- .add("timestamp", timestamp)
- .toString();
- }
-}
-
-class SliceFromReadCommandSerializer implements IVersionedSerializer<ReadCommand>
-{
- public void serialize(ReadCommand rm, DataOutputPlus out, int version) throws IOException
- {
- SliceFromReadCommand realRM = (SliceFromReadCommand)rm;
- out.writeBoolean(realRM.isDigestQuery());
- out.writeUTF(realRM.ksName);
- ByteBufferUtil.writeWithShortLength(realRM.key, out);
- out.writeUTF(realRM.cfName);
- out.writeLong(realRM.timestamp);
- CFMetaData metadata = Schema.instance.getCFMetaData(realRM.ksName, realRM.cfName);
- metadata.comparator.sliceQueryFilterSerializer().serialize(realRM.filter, out, version);
- }
-
- public ReadCommand deserialize(DataInput in, int version) throws IOException
- {
- boolean isDigest = in.readBoolean();
- String keyspaceName = in.readUTF();
- ByteBuffer key = ByteBufferUtil.readWithShortLength(in);
- String cfName = in.readUTF();
- long timestamp = in.readLong();
- CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, cfName);
- if (metadata == null)
- {
- String message = String.format("Got slice command for nonexistent table %s.%s. If the table was just " +
- "created, this is likely due to the schema not being fully propagated. Please wait for schema " +
- "agreement on table creation.", keyspaceName, cfName);
- throw new UnknownColumnFamilyException(message, null);
- }
- SliceQueryFilter filter = metadata.comparator.sliceQueryFilterSerializer().deserialize(in, version);
- return new SliceFromReadCommand(keyspaceName, key, cfName, timestamp, filter).setIsDigestQuery(isDigest);
- }
-
- public long serializedSize(ReadCommand cmd, int version)
- {
- TypeSizes sizes = TypeSizes.NATIVE;
- SliceFromReadCommand command = (SliceFromReadCommand) cmd;
- int keySize = command.key.remaining();
-
- CFMetaData metadata = Schema.instance.getCFMetaData(cmd.ksName, cmd.cfName);
-
- int size = sizes.sizeof(cmd.isDigestQuery()); // boolean
- size += sizes.sizeof(command.ksName);
- size += sizes.sizeof((short) keySize) + keySize;
- size += sizes.sizeof(command.cfName);
- size += sizes.sizeof(cmd.timestamp);
- size += metadata.comparator.sliceQueryFilterSerializer().serializedSize(command.filter, version);
-
- return size;
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/Slices.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Slices.java b/src/java/org/apache/cassandra/db/Slices.java
new file mode 100644
index 0000000..ec7797d
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/Slices.java
@@ -0,0 +1,898 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import com.google.common.collect.Iterators;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+/**
+ * Represents the selection of multiple range of rows within a partition.
+ * <p>
+ * A {@code Slices} is basically a list of {@code Slice}, though those are guaranteed to be non-overlapping
+ * and always in clustering order.
+ */
+public abstract class Slices implements Iterable<Slice>
+{
+ public static final Serializer serializer = new Serializer();
+
+ /** Slices selecting all the rows of a partition. */
+ public static final Slices ALL = new SelectAllSlices();
+ /** Slices selecting no rows in a partition. */
+ public static final Slices NONE = new SelectNoSlices();
+
+ protected Slices()
+ {
+ }
+
+ /**
+ * Creates a {@code Slices} object that contains a single slice.
+ *
+ * @param comparator the comparator for the table {@code slice} is a slice of.
+ * @param slice the single slice that the return object should contains.
+ *
+ * @return the newly created {@code Slices} object.
+ */
+ public static Slices with(ClusteringComparator comparator, Slice slice)
+ {
+ if (slice.start() == Slice.Bound.BOTTOM && slice.end() == Slice.Bound.TOP)
+ return Slices.ALL;
+
+ assert comparator.compare(slice.start(), slice.end()) <= 0;
+ return new ArrayBackedSlices(comparator, new Slice[]{ slice });
+ }
+
+ /**
+ * Whether the slices has a lower bound, that is whether it's first slice start is {@code Slice.BOTTOM}.
+ *
+ * @return whether the slices has a lower bound.
+ */
+ public abstract boolean hasLowerBound();
+
+ /**
+ * Whether the slices has an upper bound, that is whether it's last slice end is {@code Slice.TOP}.
+ *
+ * @return whether the slices has an upper bound.
+ */
+ public abstract boolean hasUpperBound();
+
+ /**
+ * The number of slice this object contains.
+ *
+ * @return the number of slice this object contains.
+ */
+ public abstract int size();
+
+ /**
+ * Returns the ith slice of this {@code Slices} object.
+ *
+ * @return the ith slice of this object.
+ */
+ public abstract Slice get(int i);
+
+ /**
+ * Returns slices for continuing the paging of those slices given the last returned clustering prefix.
+ *
+ * @param comparator the comparator for the table this is a filter for.
+ * @param lastReturned the last clustering that was returned for the query we are paging for. The
+ * resulting slices will be such that only results coming stricly after {@code lastReturned} are returned
+ * (where coming after means "greater than" if {@code !reversed} and "lesser than" otherwise).
+ * @param inclusive whether or not we want to include the {@code lastReturned} in the newly returned page of results.
+ * @param reversed whether the query we're paging for is reversed or not.
+ *
+ * @return new slices that select results coming after {@code lastReturned}.
+ */
+ public abstract Slices forPaging(ClusteringComparator comparator, Clustering lastReturned, boolean inclusive, boolean reversed);
+
+ /**
+ * An object that allows to test whether rows are selected by this {@code Slices} objects assuming those rows
+ * are tested in clustering order.
+ *
+ * @param reversed if true, the rows passed to the returned object will be assumed to be in reversed clustering
+ * order, otherwise they should be in clustering order.
+ *
+ * @return an object that tests for selection of rows by this {@code Slices} object.
+ */
+ public abstract InOrderTester inOrderTester(boolean reversed);
+
+ /**
+ * Whether a given clustering (row) is selected by this {@code Slices} object.
+ *
+ * @param clustering the clustering to test for selection.
+ *
+ * @return whether a given clustering (row) is selected by this {@code Slices} object.
+ */
+ public abstract boolean selects(Clustering clustering);
+
+
+ /**
+ * Given the per-clustering column minimum and maximum value a sstable contains, whether or not this slices potentially
+ * intersects that sstable or not.
+ *
+ * @param minClusteringValues the smallest values for each clustering column that a sstable contains.
+ * @param maxClusteringValues the biggest values for each clustering column that a sstable contains.
+ *
+ * @return whether the slices might intersects with the sstable having {@code minClusteringValues} and
+ * {@code maxClusteringValues}.
+ */
+ public abstract boolean intersects(List<ByteBuffer> minClusteringValues, List<ByteBuffer> maxClusteringValues);
+
+ /**
+ * Given a sliceable row iterator, returns a row iterator that only return rows selected by the slice of
+ * this {@code Slices} object.
+ *
+ * @param iter the sliceable iterator to filter.
+ *
+ * @return an iterator that only returns the rows (or rather Unfiltered) of {@code iter} that are selected by those slices.
+ */
+ public abstract UnfilteredRowIterator makeSliceIterator(SliceableUnfilteredRowIterator iter);
+
+ public abstract String toCQLString(CFMetaData metadata);
+
+ /**
+ * In simple object that allows to test the inclusion of rows in those slices assuming those rows
+ * are passed (to {@link #includes}) in clustering order (or reverse clustering ordered, depending
+ * of the argument passed to {@link #inOrderTester}).
+ */
+ public interface InOrderTester
+ {
+ public boolean includes(Clustering value);
+ public boolean isDone();
+ }
+
+ /**
+ * Builder to create {@code Slices} objects.
+ */
+ public static class Builder
+ {
+ private final ClusteringComparator comparator;
+
+ private final List<Slice> slices;
+
+ private boolean needsNormalizing;
+
+ public Builder(ClusteringComparator comparator)
+ {
+ this.comparator = comparator;
+ this.slices = new ArrayList<>();
+ }
+
+ public Builder(ClusteringComparator comparator, int initialSize)
+ {
+ this.comparator = comparator;
+ this.slices = new ArrayList<>(initialSize);
+ }
+
+ public Builder add(Slice.Bound start, Slice.Bound end)
+ {
+ return add(Slice.make(start, end));
+ }
+
+ public Builder add(Slice slice)
+ {
+ assert comparator.compare(slice.start(), slice.end()) <= 0;
+ if (slices.size() > 0 && comparator.compare(slices.get(slices.size()-1).end(), slice.start()) > 0)
+ needsNormalizing = true;
+ slices.add(slice);
+ return this;
+ }
+
+ public int size()
+ {
+ return slices.size();
+ }
+
+ public Slices build()
+ {
+ if (slices.isEmpty())
+ return NONE;
+
+ if (slices.size() == 1 && slices.get(0) == Slice.ALL)
+ return ALL;
+
+ List<Slice> normalized = needsNormalizing
+ ? normalize(slices)
+ : slices;
+
+ return new ArrayBackedSlices(comparator, normalized.toArray(new Slice[normalized.size()]));
+ }
+
+ /**
+ * Given an array of slices (potentially overlapping and in any order) and return an equivalent array
+ * of non-overlapping slices in clustering order.
+ *
+ * @param slices an array of slices. This may be modified by this method.
+ * @return the smallest possible array of non-overlapping slices in clustering order. If the original
+ * slices are already non-overlapping and in comparator order, this may or may not return the provided slices
+ * directly.
+ */
+ private List<Slice> normalize(List<Slice> slices)
+ {
+ if (slices.size() <= 1)
+ return slices;
+
+ Collections.sort(slices, new Comparator<Slice>()
+ {
+ @Override
+ public int compare(Slice s1, Slice s2)
+ {
+ int c = comparator.compare(s1.start(), s2.start());
+ if (c != 0)
+ return c;
+
+ return comparator.compare(s1.end(), s2.end());
+ }
+ });
+
+ List<Slice> slicesCopy = new ArrayList<>(slices.size());
+
+ Slice last = slices.get(0);
+
+ for (int i = 1; i < slices.size(); i++)
+ {
+ Slice s2 = slices.get(i);
+
+ boolean includesStart = last.includes(comparator, s2.start());
+ boolean includesFinish = last.includes(comparator, s2.end());
+
+ if (includesStart && includesFinish)
+ continue;
+
+ if (!includesStart && !includesFinish)
+ {
+ slicesCopy.add(last);
+ last = s2;
+ continue;
+ }
+
+ if (includesStart)
+ {
+ last = Slice.make(last.start(), s2.end());
+ continue;
+ }
+
+ assert !includesFinish;
+ }
+
+ slicesCopy.add(last);
+ return slicesCopy;
+ }
+ }
+
+ public static class Serializer
+ {
+ public void serialize(Slices slices, DataOutputPlus out, int version) throws IOException
+ {
+ int size = slices.size();
+ out.writeInt(size);
+
+ if (size == 0)
+ return;
+
+ List<AbstractType<?>> types = slices == ALL
+ ? Collections.<AbstractType<?>>emptyList()
+ : ((ArrayBackedSlices)slices).comparator.subtypes();
+
+ for (Slice slice : slices)
+ Slice.serializer.serialize(slice, out, version, types);
+ }
+
+ public long serializedSize(Slices slices, int version, TypeSizes sizes)
+ {
+ long size = sizes.sizeof(slices.size());
+
+ if (slices.size() == 0)
+ return size;
+
+ List<AbstractType<?>> types = slices instanceof SelectAllSlices
+ ? Collections.<AbstractType<?>>emptyList()
+ : ((ArrayBackedSlices)slices).comparator.subtypes();
+
+ for (Slice slice : slices)
+ size += Slice.serializer.serializedSize(slice, version, types, sizes);
+
+ return size;
+ }
+
+ public Slices deserialize(DataInput in, int version, CFMetaData metadata) throws IOException
+ {
+ int size = in.readInt();
+
+ if (size == 0)
+ return NONE;
+
+ Slice[] slices = new Slice[size];
+ for (int i = 0; i < size; i++)
+ slices[i] = Slice.serializer.deserialize(in, version, metadata.comparator.subtypes());
+
+ if (size == 1 && slices[0].start() == Slice.Bound.BOTTOM && slices[0].end() == Slice.Bound.TOP)
+ return ALL;
+
+ return new ArrayBackedSlices(metadata.comparator, slices);
+ }
+ }
+
+ /**
+ * Simple {@code Slices} implementation that stores its slices in an array.
+ */
+ private static class ArrayBackedSlices extends Slices
+ {
+ private final ClusteringComparator comparator;
+
+ private final Slice[] slices;
+
+ private ArrayBackedSlices(ClusteringComparator comparator, Slice[] slices)
+ {
+ this.comparator = comparator;
+ this.slices = slices;
+ }
+
+ public int size()
+ {
+ return slices.length;
+ }
+
+ public boolean hasLowerBound()
+ {
+ return slices[0].start().size() != 0;
+ }
+
+ public boolean hasUpperBound()
+ {
+ return slices[slices.length - 1].end().size() != 0;
+ }
+
+ public Slice get(int i)
+ {
+ return slices[i];
+ }
+
+ public boolean selects(Clustering clustering)
+ {
+ for (int i = 0; i < slices.length; i++)
+ {
+ Slice slice = slices[i];
+ if (comparator.compare(clustering, slice.start()) < 0)
+ return false;
+
+ if (comparator.compare(clustering, slice.end()) <= 0)
+ return true;
+ }
+ return false;
+ }
+
+ public InOrderTester inOrderTester(boolean reversed)
+ {
+ return reversed ? new InReverseOrderTester() : new InForwardOrderTester();
+ }
+
+ public Slices forPaging(ClusteringComparator comparator, Clustering lastReturned, boolean inclusive, boolean reversed)
+ {
+ return reversed ? forReversePaging(comparator, lastReturned, inclusive) : forForwardPaging(comparator, lastReturned, inclusive);
+ }
+
+ private Slices forForwardPaging(ClusteringComparator comparator, Clustering lastReturned, boolean inclusive)
+ {
+ for (int i = 0; i < slices.length; i++)
+ {
+ Slice slice = slices[i];
+ Slice newSlice = slice.forPaging(comparator, lastReturned, inclusive, false);
+ if (newSlice == null)
+ continue;
+
+ if (slice == newSlice && i == 0)
+ return this;
+
+ ArrayBackedSlices newSlices = new ArrayBackedSlices(comparator, Arrays.copyOfRange(slices, i, slices.length));
+ newSlices.slices[0] = newSlice;
+ return newSlices;
+ }
+ return Slices.NONE;
+ }
+
+ private Slices forReversePaging(ClusteringComparator comparator, Clustering lastReturned, boolean inclusive)
+ {
+ for (int i = slices.length - 1; i >= 0; i--)
+ {
+ Slice slice = slices[i];
+ Slice newSlice = slice.forPaging(comparator, lastReturned, inclusive, true);
+ if (newSlice == null)
+ continue;
+
+ if (slice == newSlice && i == slices.length - 1)
+ return this;
+
+ ArrayBackedSlices newSlices = new ArrayBackedSlices(comparator, Arrays.copyOfRange(slices, 0, i + 1));
+ newSlices.slices[i] = newSlice;
+ return newSlices;
+ }
+ return Slices.NONE;
+ }
+
+ public boolean intersects(List<ByteBuffer> minClusteringValues, List<ByteBuffer> maxClusteringValues)
+ {
+ for (Slice slice : this)
+ {
+ if (slice.intersects(comparator, minClusteringValues, maxClusteringValues))
+ return true;
+ }
+ return false;
+ }
+
+ public UnfilteredRowIterator makeSliceIterator(final SliceableUnfilteredRowIterator iter)
+ {
+ return new WrappingUnfilteredRowIterator(iter)
+ {
+ private int nextSlice = iter.isReverseOrder() ? slices.length - 1 : 0;
+ private Iterator<Unfiltered> currentSliceIterator = Collections.emptyIterator();
+
+ private Unfiltered next;
+
+ @Override
+ public boolean hasNext()
+ {
+ prepareNext();
+ return next != null;
+ }
+
+ @Override
+ public Unfiltered next()
+ {
+ prepareNext();
+ Unfiltered toReturn = next;
+ next = null;
+ return toReturn;
+ }
+
+ private boolean hasMoreSlice()
+ {
+ return isReverseOrder()
+ ? nextSlice >= 0
+ : nextSlice < slices.length;
+ }
+
+ private Slice popNextSlice()
+ {
+ return slices[isReverseOrder() ? nextSlice-- : nextSlice++];
+ }
+
+ private void prepareNext()
+ {
+ if (next != null)
+ return;
+
+ while (true)
+ {
+ if (currentSliceIterator.hasNext())
+ {
+ next = currentSliceIterator.next();
+ return;
+ }
+
+ if (!hasMoreSlice())
+ return;
+
+ currentSliceIterator = iter.slice(popNextSlice());
+ }
+ }
+ };
+ }
+
+ public Iterator<Slice> iterator()
+ {
+ return Iterators.forArray(slices);
+ }
+
+ private class InForwardOrderTester implements InOrderTester
+ {
+ private int idx;
+ private boolean inSlice;
+
+ public boolean includes(Clustering value)
+ {
+ while (idx < slices.length)
+ {
+ if (!inSlice)
+ {
+ int cmp = comparator.compare(value, slices[idx].start());
+ // value < start
+ if (cmp < 0)
+ return false;
+
+ inSlice = true;
+
+ if (cmp == 0)
+ return true;
+ }
+
+ // Here, start < value and inSlice
+ if (comparator.compare(value, slices[idx].end()) <= 0)
+ return true;
+
+ ++idx;
+ inSlice = false;
+ }
+ return false;
+ }
+
+ public boolean isDone()
+ {
+ return idx >= slices.length;
+ }
+ }
+
+ private class InReverseOrderTester implements InOrderTester
+ {
+ private int idx;
+ private boolean inSlice;
+
+ public InReverseOrderTester()
+ {
+ this.idx = slices.length - 1;
+ }
+
+ public boolean includes(Clustering value)
+ {
+ while (idx >= 0)
+ {
+ if (!inSlice)
+ {
+ int cmp = comparator.compare(slices[idx].end(), value);
+ // value > end
+ if (cmp > 0)
+ return false;
+
+ inSlice = true;
+
+ if (cmp == 0)
+ return true;
+ }
+
+ // Here, value <= end and inSlice
+ if (comparator.compare(slices[idx].start(), value) <= 0)
+ return true;
+
+ --idx;
+ inSlice = false;
+ }
+ return false;
+ }
+
+ public boolean isDone()
+ {
+ return idx < 0;
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append("{");
+ for (int i = 0; i < slices.length; i++)
+ {
+ if (i > 0)
+ sb.append(", ");
+ sb.append(slices[i].toString(comparator));
+ }
+ return sb.append("}").toString();
+ }
+
+ public String toCQLString(CFMetaData metadata)
+ {
+ StringBuilder sb = new StringBuilder();
+
+ // In CQL, condition are expressed by column, so first group things that way,
+ // i.e. for each column, we create a list of what each slice contains on that column
+ int clusteringSize = metadata.clusteringColumns().size();
+ List<List<ComponentOfSlice>> columnComponents = new ArrayList<>(clusteringSize);
+ for (int i = 0; i < clusteringSize; i++)
+ {
+ List<ComponentOfSlice> perSlice = new ArrayList<>();
+ columnComponents.add(perSlice);
+
+ for (int j = 0; j < slices.length; j++)
+ {
+ ComponentOfSlice c = ComponentOfSlice.fromSlice(i, slices[j]);
+ if (c != null)
+ perSlice.add(c);
+ }
+ }
+
+ boolean needAnd = false;
+ for (int i = 0; i < clusteringSize; i++)
+ {
+ ColumnDefinition column = metadata.clusteringColumns().get(i);
+ List<ComponentOfSlice> componentInfo = columnComponents.get(i);
+ if (componentInfo.isEmpty())
+ break;
+
+ // For a given column, there is only 3 cases that CQL currently generates:
+ // 1) every slice are EQ with the same value, it's a simple '=' relation.
+ // 2) every slice are EQ but with different values, it's a IN relation.
+ // 3) every slice aren't EQ but have the same values, we have inequality relations.
+ // Note that this doesn't cover everything that ReadCommand can express, but
+ // as it's all that CQL support for now, we'll ignore other cases (which would then
+ // display a bogus query but that's not the end of the world).
+ // TODO: we should improve this at some point.
+ ComponentOfSlice first = componentInfo.get(0);
+ if (first.isEQ())
+ {
+ if (needAnd)
+ sb.append(" AND ");
+ needAnd = true;
+
+ sb.append(column.name);
+
+ Set<ByteBuffer> values = new LinkedHashSet<>();
+ for (int j = 0; j < componentInfo.size(); j++)
+ values.add(componentInfo.get(j).startValue);
+
+ if (values.size() == 1)
+ {
+ sb.append(" = ").append(column.type.getString(first.startValue));
+ }
+ else
+ {
+ sb.append(" IN (");
+ int j = 0;
+ for (ByteBuffer value : values)
+ sb.append(j++ == 0 ? "" : ", ").append(column.type.getString(value));
+ sb.append(")");
+ }
+ }
+ else
+ {
+ // As said above, we assume (without checking) that this means all ComponentOfSlice for this column
+ // are the same, so we only bother about the first.
+ if (first.startValue != null)
+ {
+ if (needAnd)
+ sb.append(" AND ");
+ needAnd = true;
+ sb.append(column.name).append(first.startInclusive ? " >= " : " > ").append(column.type.getString(first.startValue));
+ }
+ if (first.endValue != null)
+ {
+ if (needAnd)
+ sb.append(" AND ");
+ needAnd = true;
+ sb.append(column.name).append(first.endInclusive ? " <= " : " < ").append(column.type.getString(first.endValue));
+ }
+ }
+ }
+ return sb.toString();
+ }
+
+ // An somewhat adhoc utility class only used by toCQLString
+ private static class ComponentOfSlice
+ {
+ public final boolean startInclusive;
+ public final ByteBuffer startValue;
+ public final boolean endInclusive;
+ public final ByteBuffer endValue;
+
+ private ComponentOfSlice(boolean startInclusive, ByteBuffer startValue, boolean endInclusive, ByteBuffer endValue)
+ {
+ this.startInclusive = startInclusive;
+ this.startValue = startValue;
+ this.endInclusive = endInclusive;
+ this.endValue = endValue;
+ }
+
+ public static ComponentOfSlice fromSlice(int component, Slice slice)
+ {
+ Slice.Bound start = slice.start();
+ Slice.Bound end = slice.end();
+
+ if (component >= start.size() && component >= end.size())
+ return null;
+
+ boolean startInclusive = true, endInclusive = true;
+ ByteBuffer startValue = null, endValue = null;
+ if (component < start.size())
+ {
+ startInclusive = start.isInclusive();
+ startValue = start.get(component);
+ }
+ if (component < end.size())
+ {
+ endInclusive = end.isInclusive();
+ endValue = end.get(component);
+ }
+ return new ComponentOfSlice(startInclusive, startValue, endInclusive, endValue);
+ }
+
+ public boolean isEQ()
+ {
+ return startValue.equals(endValue);
+ }
+ }
+ }
+
+ /**
+ * Specialized implementation of {@code Slices} that selects all rows.
+ * <p>
+ * This is equivalent to having the single {@code Slice.ALL} slice, but is somewhat more effecient.
+ */
+ private static class SelectAllSlices extends Slices
+ {
+ private static final InOrderTester trivialTester = new InOrderTester()
+ {
+ public boolean includes(Clustering value)
+ {
+ return true;
+ }
+
+ public boolean isDone()
+ {
+ return false;
+ }
+ };
+
+ public int size()
+ {
+ return 1;
+ }
+
+ public Slice get(int i)
+ {
+ return Slice.ALL;
+ }
+
+ public boolean hasLowerBound()
+ {
+ return false;
+ }
+
+ public boolean hasUpperBound()
+ {
+ return false;
+ }
+
+ public boolean selects(Clustering clustering)
+ {
+ return true;
+ }
+
+ public Slices forPaging(ClusteringComparator comparator, Clustering lastReturned, boolean inclusive, boolean reversed)
+ {
+ return new ArrayBackedSlices(comparator, new Slice[]{ Slice.ALL.forPaging(comparator, lastReturned, inclusive, reversed) });
+ }
+
+ public InOrderTester inOrderTester(boolean reversed)
+ {
+ return trivialTester;
+ }
+
+ public boolean intersects(List<ByteBuffer> minClusteringValues, List<ByteBuffer> maxClusteringValues)
+ {
+ return true;
+ }
+
+ public UnfilteredRowIterator makeSliceIterator(SliceableUnfilteredRowIterator iter)
+ {
+ return iter;
+ }
+
+ public Iterator<Slice> iterator()
+ {
+ return Iterators.singletonIterator(Slice.ALL);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "ALL";
+ }
+
+ public String toCQLString(CFMetaData metadata)
+ {
+ return "";
+ }
+ }
+
+ /**
+ * Specialized implementation of {@code Slices} that selects no rows.
+ */
+ private static class SelectNoSlices extends Slices
+ {
+ private static final InOrderTester trivialTester = new InOrderTester()
+ {
+ public boolean includes(Clustering value)
+ {
+ return false;
+ }
+
+ public boolean isDone()
+ {
+ return true;
+ }
+ };
+
+ public int size()
+ {
+ return 0;
+ }
+
+ public Slice get(int i)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public boolean hasLowerBound()
+ {
+ return false;
+ }
+
+ public boolean hasUpperBound()
+ {
+ return false;
+ }
+
+ public Slices forPaging(ClusteringComparator comparator, Clustering lastReturned, boolean inclusive, boolean reversed)
+ {
+ return this;
+ }
+
+ public boolean selects(Clustering clustering)
+ {
+ return false;
+ }
+
+ public InOrderTester inOrderTester(boolean reversed)
+ {
+ return trivialTester;
+ }
+
+ public boolean intersects(List<ByteBuffer> minClusteringValues, List<ByteBuffer> maxClusteringValues)
+ {
+ return false;
+ }
+
+ public UnfilteredRowIterator makeSliceIterator(SliceableUnfilteredRowIterator iter)
+ {
+ return UnfilteredRowIterators.emptyIterator(iter.metadata(), iter.partitionKey(), iter.isReverseOrder());
+ }
+
+ public Iterator<Slice> iterator()
+ {
+ return Iterators.emptyIterator();
+ }
+
+ @Override
+ public String toString()
+ {
+ return "NONE";
+ }
+
+ public String toCQLString(CFMetaData metadata)
+ {
+ return "";
+ }
+ }
+}