You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by xe...@apache.org on 2016/01/24 04:36:11 UTC
[09/14] cassandra git commit: Integrate SASI index into Cassandra
http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/plan/Operation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/plan/Operation.java b/src/java/org/apache/cassandra/index/sasi/plan/Operation.java
new file mode 100644
index 0000000..1857c56
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/plan/Operation.java
@@ -0,0 +1,477 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.plan;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.ColumnDefinition.Kind;
+import org.apache.cassandra.cql3.Operator;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.Unfiltered;
+import org.apache.cassandra.index.sasi.conf.ColumnIndex;
+import org.apache.cassandra.index.sasi.analyzer.AbstractAnalyzer;
+import org.apache.cassandra.index.sasi.disk.Token;
+import org.apache.cassandra.index.sasi.plan.Expression.Op;
+import org.apache.cassandra.index.sasi.utils.RangeIntersectionIterator;
+import org.apache.cassandra.index.sasi.utils.RangeIterator;
+import org.apache.cassandra.index.sasi.utils.RangeUnionIterator;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.*;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class Operation extends RangeIterator<Long, Token>
+{
+ public enum OperationType
+ {
+ AND, OR;
+
+ public boolean apply(boolean a, boolean b)
+ {
+ switch (this)
+ {
+ case OR:
+ return a | b;
+
+ case AND:
+ return a & b;
+
+ default:
+ throw new AssertionError();
+ }
+ }
+ }
+
+ private final QueryController controller;
+
+ protected final OperationType op;
+ protected final ListMultimap<ColumnDefinition, Expression> expressions;
+ protected final RangeIterator<Long, Token> range;
+
+ protected Operation left, right;
+
+ private Operation(OperationType operation,
+ QueryController controller,
+ ListMultimap<ColumnDefinition, Expression> expressions,
+ RangeIterator<Long, Token> range,
+ Operation left, Operation right)
+ {
+ super(range);
+
+ this.op = operation;
+ this.controller = controller;
+ this.expressions = expressions;
+ this.range = range;
+
+ this.left = left;
+ this.right = right;
+ }
+
+ /**
+ * Recursive "satisfies" checks based on operation
+ * and data from the lower level members using depth-first search
+ * and bubbling the results back to the top level caller.
+ *
+ * Most of the work here is done by {@link #localSatisfiedBy(Unfiltered, boolean)}
+ * see it's comment for details, if there are no local expressions
+ * assigned to Operation it will call satisfiedBy(Row) on it's children.
+ *
+ * Query: first_name = X AND (last_name = Y OR address = XYZ AND street = IL AND city = C) OR (state = 'CA' AND country = 'US')
+ * Row: key1: (first_name: X, last_name: Z, address: XYZ, street: IL, city: C, state: NY, country:US)
+ *
+ * #1 OR
+ * / \
+ * #2 (first_name) AND AND (state, country)
+ * \
+ * #3 (last_name) OR
+ * \
+ * #4 AND (address, street, city)
+ *
+ *
+ * Evaluation of the key1 is top-down depth-first search:
+ *
+ * --- going down ---
+ * Level #1 is evaluated, OR expression has to pull results from it's children which are at level #2 and OR them together,
+ * Level #2 AND (state, country) could be be evaluated right away, AND (first_name) refers to it's "right" child from level #3
+ * Level #3 OR (last_name) requests results from level #4
+ * Level #4 AND (address, street, city) does logical AND between it's 3 fields, returns result back to level #3.
+ * --- bubbling up ---
+ * Level #3 computes OR between AND (address, street, city) result and it's "last_name" expression
+ * Level #2 computes AND between "first_name" and result of level #3, AND (state, country) which is already computed
+ * Level #1 does OR between results of AND (first_name) and AND (state, country) and returns final result.
+ *
+ * @param row The row to check.
+ * @return true if give Row satisfied all of the expressions in the tree,
+ * false otherwise.
+ */
+ public boolean satisfiedBy(Unfiltered row, boolean allowMissingColumns)
+ {
+ boolean sideL, sideR;
+
+ if (expressions == null || expressions.isEmpty())
+ {
+ sideL = left != null && left.satisfiedBy(row, allowMissingColumns);
+ sideR = right != null && right.satisfiedBy(row, allowMissingColumns);
+
+ // one of the expressions was skipped
+ // because it had no indexes attached
+ if (left == null)
+ return sideR;
+ }
+ else
+ {
+ sideL = localSatisfiedBy(row, allowMissingColumns);
+
+ // if there is no right it means that this expression
+ // is last in the sequence, we can just return result from local expressions
+ if (right == null)
+ return sideL;
+
+ sideR = right.satisfiedBy(row, allowMissingColumns);
+ }
+
+
+ return op.apply(sideL, sideR);
+ }
+
+ /**
+ * Check every expression in the analyzed list to figure out if the
+ * columns in the give row match all of the based on the operation
+ * set to the current operation node.
+ *
+ * The algorithm is as follows: for every given expression from analyzed
+ * list get corresponding column from the Row:
+ * - apply {@link Expression#contains(ByteBuffer)}
+ * method to figure out if it's satisfied;
+ * - apply logical operation between boolean accumulator and current boolean result;
+ * - if result == false and node's operation is AND return right away;
+ *
+ * After all of the expressions have been evaluated return resulting accumulator variable.
+ *
+ * Example:
+ *
+ * Operation = (op: AND, columns: [first_name = p, 5 < age < 7, last_name: y])
+ * Row = (first_name: pavel, last_name: y, age: 6, timestamp: 15)
+ *
+ * #1 get "first_name" = p (expressions)
+ * - row-get "first_name" => "pavel"
+ * - compare "pavel" against "p" => true (current)
+ * - set accumulator current => true (because this is expression #1)
+ *
+ * #2 get "last_name" = y (expressions)
+ * - row-get "last_name" => "y"
+ * - compare "y" against "y" => true (current)
+ * - set accumulator to accumulator & current => true
+ *
+ * #3 get 5 < "age" < 7 (expressions)
+ * - row-get "age" => "6"
+ * - compare 5 < 6 < 7 => true (current)
+ * - set accumulator to accumulator & current => true
+ *
+ * #4 return accumulator => true (row satisfied all of the conditions)
+ *
+ * @param row The row to check.
+ * @return true if give Row satisfied all of the analyzed expressions,
+ * false otherwise.
+ */
+ private boolean localSatisfiedBy(Unfiltered row, boolean allowMissingColumns)
+ {
+ if (row == null || !row.isRow())
+ return false;
+
+ final int now = FBUtilities.nowInSeconds();
+ boolean result = false;
+ int idx = 0;
+
+ for (ColumnDefinition column : expressions.keySet())
+ {
+ if (column.kind == Kind.PARTITION_KEY)
+ continue;
+
+ ByteBuffer value = ColumnIndex.getValueOf(column, (Row) row, now);
+ boolean isMissingColumn = value == null;
+
+ if (!allowMissingColumns && isMissingColumn)
+ throw new IllegalStateException("All indexed columns should be included into the column slice, missing: " + column);
+
+ boolean isMatch = false;
+ // If there is a column with multiple expressions that effectively means an OR
+ // e.g. comment = 'x y z' could be split into 'comment' EQ 'x', 'comment' EQ 'y', 'comment' EQ 'z'
+ // by analyzer, in situation like that we only need to check if at least one of expressions matches,
+ // and there is no hit on the NOT_EQ (if any) which are always at the end of the filter list.
+ // Loop always starts from the end of the list, which makes it possible to break after the last
+ // NOT_EQ condition on first EQ/RANGE condition satisfied, instead of checking every
+ // single expression in the column filter list.
+ List<Expression> filters = expressions.get(column);
+ for (int i = filters.size() - 1; i >= 0; i--)
+ {
+ Expression expression = filters.get(i);
+ isMatch = !isMissingColumn && expression.contains(value);
+ if (expression.getOp() == Op.NOT_EQ)
+ {
+ // since this is NOT_EQ operation we have to
+ // inverse match flag (to check against other expressions),
+ // and break in case of negative inverse because that means
+ // that it's a positive hit on the not-eq clause.
+ isMatch = !isMatch;
+ if (!isMatch)
+ break;
+ } // if it was a match on EQ/RANGE or column is missing
+ else if (isMatch || isMissingColumn)
+ break;
+ }
+
+ if (idx++ == 0)
+ {
+ result = isMatch;
+ continue;
+ }
+
+ result = op.apply(result, isMatch);
+
+ // exit early because we already got a single false
+ if (op == OperationType.AND && !result)
+ return false;
+ }
+
+ return idx == 0 || result;
+ }
+
+ @VisibleForTesting
+ protected static ListMultimap<ColumnDefinition, Expression> analyzeGroup(QueryController controller,
+ OperationType op,
+ List<RowFilter.Expression> expressions)
+ {
+ ListMultimap<ColumnDefinition, Expression> analyzed = ArrayListMultimap.create();
+
+ // sort all of the expressions in the operation by name and priority of the logical operator
+ // this gives us an efficient way to handle inequality and combining into ranges without extra processing
+ // and converting expressions from one type to another.
+ Collections.sort(expressions, (a, b) -> {
+ int cmp = a.column().compareTo(b.column());
+ return cmp == 0 ? -Integer.compare(getPriority(a.operator()), getPriority(b.operator())) : cmp;
+ });
+
+ for (final RowFilter.Expression e : expressions)
+ {
+ ColumnIndex columnIndex = controller.getIndex(e);
+ List<Expression> perColumn = analyzed.get(e.column());
+
+ if (columnIndex == null)
+ columnIndex = new ColumnIndex(controller.getKeyValidator(), e.column(), null);
+
+ AbstractAnalyzer analyzer = columnIndex.getAnalyzer();
+ analyzer.reset(e.getIndexValue());
+
+ // EQ/NOT_EQ can have multiple expressions e.g. text = "Hello World",
+ // becomes text = "Hello" OR text = "World" because "space" is always interpreted as a split point (by analyzer),
+ // NOT_EQ is made an independent expression only in case of pre-existing multiple EQ expressions, or
+ // if there is no EQ operations and NOT_EQ is met or a single NOT_EQ expression present,
+ // in such case we know exactly that there would be no more EQ/RANGE expressions for given column
+ // since NOT_EQ has the lowest priority.
+ if (e.operator() == Operator.EQ
+ || (e.operator() == Operator.NEQ
+ && (perColumn.size() == 0 || perColumn.size() > 1
+ || (perColumn.size() == 1 && perColumn.get(0).getOp() == Op.NOT_EQ))))
+ {
+ while (analyzer.hasNext())
+ {
+ final ByteBuffer token = analyzer.next();
+ perColumn.add(new Expression(controller, columnIndex).add(e.operator(), token));
+ }
+ }
+ else
+ // "range" or not-equals operator, combines both bounds together into the single expression,
+ // iff operation of the group is AND, otherwise we are forced to create separate expressions,
+ // not-equals is combined with the range iff operator is AND.
+ {
+ Expression range;
+ if (perColumn.size() == 0 || op != OperationType.AND)
+ perColumn.add((range = new Expression(controller, columnIndex)));
+ else
+ range = Iterables.getLast(perColumn);
+
+ while (analyzer.hasNext())
+ range.add(e.operator(), analyzer.next());
+ }
+ }
+
+ return analyzed;
+ }
+
+ private static int getPriority(Operator op)
+ {
+ switch (op)
+ {
+ case EQ:
+ return 4;
+
+ case GTE:
+ case GT:
+ return 3;
+
+ case LTE:
+ case LT:
+ return 2;
+
+ case NEQ:
+ return 1;
+
+ default:
+ return 0;
+ }
+ }
+
+ protected Token computeNext()
+ {
+ return range != null && range.hasNext() ? range.next() : endOfData();
+ }
+
+ protected void performSkipTo(Long nextToken)
+ {
+ if (range != null)
+ range.skipTo(nextToken);
+ }
+
+ public void close() throws IOException
+ {
+ controller.releaseIndexes(this);
+ }
+
+ public static class Builder
+ {
+ private final QueryController controller;
+
+ protected final OperationType op;
+ protected final List<RowFilter.Expression> expressions;
+
+ protected Builder left, right;
+
+ public Builder(OperationType operation, QueryController controller, RowFilter.Expression... columns)
+ {
+ this.op = operation;
+ this.controller = controller;
+ this.expressions = new ArrayList<>();
+ Collections.addAll(expressions, columns);
+ }
+
+ public Builder setRight(Builder operation)
+ {
+ this.right = operation;
+ return this;
+ }
+
+ public Builder setLeft(Builder operation)
+ {
+ this.left = operation;
+ return this;
+ }
+
+ public void add(RowFilter.Expression e)
+ {
+ expressions.add(e);
+ }
+
+ public void add(Collection<RowFilter.Expression> newExpressions)
+ {
+ if (expressions != null)
+ expressions.addAll(newExpressions);
+ }
+
+ public Operation complete()
+ {
+ if (!expressions.isEmpty())
+ {
+ ListMultimap<ColumnDefinition, Expression> analyzedExpressions = analyzeGroup(controller, op, expressions);
+ RangeIterator.Builder<Long, Token> range = controller.getIndexes(op, analyzedExpressions.values());
+
+ Operation rightOp = null;
+ if (right != null)
+ {
+ rightOp = right.complete();
+ range.add(rightOp);
+ }
+
+ return new Operation(op, controller, analyzedExpressions, range.build(), null, rightOp);
+ }
+ else
+ {
+ Operation leftOp = null, rightOp = null;
+ boolean leftIndexes = false, rightIndexes = false;
+
+ if (left != null)
+ {
+ leftOp = left.complete();
+ leftIndexes = leftOp != null && leftOp.range != null;
+ }
+
+ if (right != null)
+ {
+ rightOp = right.complete();
+ rightIndexes = rightOp != null && rightOp.range != null;
+ }
+
+ RangeIterator<Long, Token> join;
+ /**
+ * Operation should allow one of it's sub-trees to wrap no indexes, that is related to the fact that we
+ * have to accept defined-but-not-indexed columns as well as key range as IndexExpressions.
+ *
+ * Two cases are possible:
+ *
+ * only left child produced indexed iterators, that could happen when there are two columns
+ * or key range on the right:
+ *
+ * AND
+ * / \
+ * OR \
+ * / \ AND
+ * a b / \
+ * key key
+ *
+ * only right child produced indexed iterators:
+ *
+ * AND
+ * / \
+ * AND a
+ * / \
+ * key key
+ */
+ if (leftIndexes && !rightIndexes)
+ join = leftOp;
+ else if (!leftIndexes && rightIndexes)
+ join = rightOp;
+ else if (leftIndexes)
+ {
+ RangeIterator.Builder<Long, Token> builder = op == OperationType.OR
+ ? RangeUnionIterator.<Long, Token>builder()
+ : RangeIntersectionIterator.<Long, Token>builder();
+
+ join = builder.add(leftOp).add(rightOp).build();
+ }
+ else
+ throw new AssertionError("both sub-trees have 0 indexes.");
+
+ return new Operation(op, controller, null, join, leftOp, rightOp);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java b/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java
new file mode 100644
index 0000000..8e10fd0
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.plan;
+
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.ColumnFamilyStore.RefViewFragment;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.lifecycle.SSTableSet;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.index.Index;
+import org.apache.cassandra.index.sasi.SASIIndex;
+import org.apache.cassandra.index.sasi.SSTableIndex;
+import org.apache.cassandra.index.sasi.TermIterator;
+import org.apache.cassandra.index.sasi.conf.ColumnIndex;
+import org.apache.cassandra.index.sasi.conf.view.View;
+import org.apache.cassandra.index.sasi.disk.Token;
+import org.apache.cassandra.index.sasi.exceptions.TimeQuotaExceededException;
+import org.apache.cassandra.index.sasi.plan.Operation.OperationType;
+import org.apache.cassandra.index.sasi.utils.RangeIntersectionIterator;
+import org.apache.cassandra.index.sasi.utils.RangeIterator;
+import org.apache.cassandra.index.sasi.utils.RangeUnionIterator;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.Pair;
+
+public class QueryController
+{
+ private final long executionQuota;
+ private final long executionStart;
+
+ private final ColumnFamilyStore cfs;
+ private final PartitionRangeReadCommand command;
+ private final Map<Collection<Expression>, List<RangeIterator<Long, Token>>> resources = new HashMap<>();
+ private final RefViewFragment scope;
+ private final Set<SSTableReader> sstables;
+
+ public QueryController(ColumnFamilyStore cfs, PartitionRangeReadCommand command, long timeQuotaMs)
+ {
+ this.cfs = cfs;
+ this.command = command;
+ this.executionQuota = TimeUnit.MILLISECONDS.toNanos(timeQuotaMs);
+ this.executionStart = System.nanoTime();
+ this.scope = getSSTableScope(cfs, command);
+ this.sstables = new HashSet<>(scope.sstables);
+ }
+
+ public boolean isForThrift()
+ {
+ return command.isForThrift();
+ }
+
+ public CFMetaData metadata()
+ {
+ return command.metadata();
+ }
+
+ public Collection<RowFilter.Expression> getExpressions()
+ {
+ return command.rowFilter().getExpressions();
+ }
+
+ public DataRange dataRange()
+ {
+ return command.dataRange();
+ }
+
+ public AbstractType<?> getKeyValidator()
+ {
+ return cfs.metadata.getKeyValidator();
+ }
+
+ public ColumnIndex getIndex(RowFilter.Expression expression)
+ {
+ Optional<Index> index = cfs.indexManager.getBestIndexFor(expression);
+ return index.isPresent() ? ((SASIIndex) index.get()).getIndex() : null;
+ }
+
+
+ public UnfilteredRowIterator getPartition(DecoratedKey key, ReadExecutionController executionController)
+ {
+ if (key == null)
+ throw new NullPointerException();
+ try
+ {
+ SinglePartitionReadCommand partition = SinglePartitionReadCommand.create(command.isForThrift(),
+ cfs.metadata,
+ command.nowInSec(),
+ command.columnFilter(),
+ command.rowFilter().withoutExpressions(),
+ DataLimits.NONE,
+ key,
+ command.clusteringIndexFilter(key));
+
+ return partition.queryMemtableAndDisk(cfs, executionController.baseReadOpOrderGroup());
+ }
+ finally
+ {
+ checkpoint();
+ }
+ }
+
+ /**
+ * Build a range iterator from the given list of expressions by applying given operation (OR/AND).
+ * Building of such iterator involves index search, results of which are persisted in the internal resources list
+ * and can be released later via {@link QueryController#releaseIndexes(Operation)}.
+ *
+ * @param op The operation type to coalesce expressions with.
+ * @param expressions The expressions to build range iterator from (expressions with not results are ignored).
+ *
+ * @return The range builder based on given expressions and operation type.
+ */
+ public RangeIterator.Builder<Long, Token> getIndexes(OperationType op, Collection<Expression> expressions)
+ {
+ if (resources.containsKey(expressions))
+ throw new IllegalArgumentException("Can't process the same expressions multiple times.");
+
+ RangeIterator.Builder<Long, Token> builder = op == OperationType.OR
+ ? RangeUnionIterator.<Long, Token>builder()
+ : RangeIntersectionIterator.<Long, Token>builder();
+
+ List<RangeIterator<Long, Token>> perIndexUnions = new ArrayList<>();
+
+ for (Map.Entry<Expression, Set<SSTableIndex>> e : getView(op, expressions).entrySet())
+ {
+ RangeIterator<Long, Token> index = TermIterator.build(e.getKey(), e.getValue());
+
+ if (index == null)
+ continue;
+
+ builder.add(index);
+ perIndexUnions.add(index);
+ }
+
+ resources.put(expressions, perIndexUnions);
+ return builder;
+ }
+
+ public void checkpoint()
+ {
+ if ((System.nanoTime() - executionStart) >= executionQuota)
+ throw new TimeQuotaExceededException();
+ }
+
+ public void releaseIndexes(Operation operation)
+ {
+ if (operation.expressions != null)
+ releaseIndexes(resources.remove(operation.expressions.values()));
+ }
+
+ private void releaseIndexes(List<RangeIterator<Long, Token>> indexes)
+ {
+ if (indexes == null)
+ return;
+
+ indexes.forEach(FileUtils::closeQuietly);
+ }
+
+ public void finish()
+ {
+ try
+ {
+ resources.values().forEach(this::releaseIndexes);
+ }
+ finally
+ {
+ scope.release();
+ }
+ }
+
+ private Map<Expression, Set<SSTableIndex>> getView(OperationType op, Collection<Expression> expressions)
+ {
+ // first let's determine the primary expression if op is AND
+ Pair<Expression, Set<SSTableIndex>> primary = (op == OperationType.AND) ? calculatePrimary(expressions) : null;
+
+ Map<Expression, Set<SSTableIndex>> indexes = new HashMap<>();
+ for (Expression e : expressions)
+ {
+ // NO_EQ and non-index column query should only act as FILTER BY for satisfiedBy(Row) method
+ // because otherwise it likely to go through the whole index.
+ if (!e.isIndexed() || e.getOp() == Expression.Op.NOT_EQ)
+ continue;
+
+ // primary expression, we'll have to add as is
+ if (primary != null && e.equals(primary.left))
+ {
+ indexes.put(primary.left, primary.right);
+ continue;
+ }
+
+ View view = e.index.getView();
+ if (view == null)
+ continue;
+
+ Set<SSTableIndex> readers = new HashSet<>();
+ if (primary != null && primary.right.size() > 0)
+ {
+ for (SSTableIndex index : primary.right)
+ readers.addAll(view.match(index.minKey(), index.maxKey()));
+ }
+ else
+ {
+ readers.addAll(view.match(sstables, e));
+ }
+
+ indexes.put(e, readers);
+ }
+
+ return indexes;
+ }
+
+ private Pair<Expression, Set<SSTableIndex>> calculatePrimary(Collection<Expression> expressions)
+ {
+ Expression expression = null;
+ Set<SSTableIndex> primaryIndexes = Collections.emptySet();
+
+ for (Expression e : expressions)
+ {
+ if (!e.isIndexed())
+ continue;
+
+ View view = e.index.getView();
+ if (view == null)
+ continue;
+
+ Set<SSTableIndex> indexes = view.match(sstables, e);
+ if (primaryIndexes.size() > indexes.size())
+ {
+ primaryIndexes = indexes;
+ expression = e;
+ }
+ }
+
+ return expression == null ? null : Pair.create(expression, primaryIndexes);
+ }
+
+ private static RefViewFragment getSSTableScope(ColumnFamilyStore cfs, PartitionRangeReadCommand command)
+ {
+ return cfs.selectAndReference(org.apache.cassandra.db.lifecycle.View.select(SSTableSet.CANONICAL, command.dataRange().keyRange()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java b/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java
new file mode 100644
index 0000000..d34b05a
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.plan;
+
+import java.util.*;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.index.sasi.disk.Token;
+import org.apache.cassandra.index.sasi.plan.Operation.OperationType;
+import org.apache.cassandra.exceptions.RequestTimeoutException;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.AbstractIterator;
+
+public class QueryPlan
+{
+ private final QueryController controller;
+
+ public QueryPlan(ColumnFamilyStore cfs, ReadCommand command, long executionQuotaMs)
+ {
+ this.controller = new QueryController(cfs, (PartitionRangeReadCommand) command, executionQuotaMs);
+ }
+
+ /**
+ * Converts expressions into operation tree (which is currently just a single AND).
+ *
+ * Operation tree allows us to do a couple of important optimizations
+ * namely, group flattening for AND operations (query rewrite), expression bounds checks,
+ * "satisfies by" checks for resulting rows with an early exit.
+ *
+ * @return root of the operations tree.
+ */
+ private Operation analyze()
+ {
+ try
+ {
+ Operation.Builder and = new Operation.Builder(OperationType.AND, controller);
+ controller.getExpressions().forEach(and::add);
+ return and.complete();
+ }
+ catch (Exception | Error e)
+ {
+ controller.finish();
+ throw e;
+ }
+ }
+
+ public UnfilteredPartitionIterator execute(ReadExecutionController executionController) throws RequestTimeoutException
+ {
+ return new ResultIterator(analyze(), controller, executionController);
+ }
+
+ private static class ResultIterator extends AbstractIterator<UnfilteredRowIterator> implements UnfilteredPartitionIterator
+ {
+ private final AbstractBounds<PartitionPosition> keyRange;
+ private final Operation operationTree;
+ private final QueryController controller;
+ private final ReadExecutionController executionController;
+
+ private Iterator<DecoratedKey> currentKeys = null;
+
+ public ResultIterator(Operation operationTree, QueryController controller, ReadExecutionController executionController)
+ {
+ this.keyRange = controller.dataRange().keyRange();
+ this.operationTree = operationTree;
+ this.controller = controller;
+ this.executionController = executionController;
+ if (operationTree != null)
+ operationTree.skipTo((Long) keyRange.left.getToken().getTokenValue());
+ }
+
+ protected UnfilteredRowIterator computeNext()
+ {
+ if (operationTree == null)
+ return endOfData();
+
+ for (;;)
+ {
+ if (currentKeys == null || !currentKeys.hasNext())
+ {
+ if (!operationTree.hasNext())
+ return endOfData();
+
+ Token token = operationTree.next();
+ currentKeys = token.iterator();
+ }
+
+ while (currentKeys.hasNext())
+ {
+ DecoratedKey key = currentKeys.next();
+
+ if (!keyRange.right.isMinimum() && keyRange.right.compareTo(key) < 0)
+ return endOfData();
+
+ try (UnfilteredRowIterator partition = controller.getPartition(key, executionController))
+ {
+ List<Unfiltered> clusters = new ArrayList<>();
+ while (partition.hasNext())
+ {
+ Unfiltered row = partition.next();
+ if (operationTree.satisfiedBy(row, true))
+ clusters.add(row);
+ }
+
+ if (!clusters.isEmpty())
+ return new PartitionIterator(partition, clusters);
+ }
+ }
+ }
+ }
+
+ private static class PartitionIterator extends AbstractUnfilteredRowIterator
+ {
+ private final Iterator<Unfiltered> rows;
+
+ public PartitionIterator(UnfilteredRowIterator partition, Collection<Unfiltered> content)
+ {
+ super(partition.metadata(),
+ partition.partitionKey(),
+ partition.partitionLevelDeletion(),
+ partition.columns(),
+ partition.staticRow(),
+ partition.isReverseOrder(),
+ partition.stats());
+
+ rows = content.iterator();
+ }
+
+ @Override
+ protected Unfiltered computeNext()
+ {
+ return rows.hasNext() ? rows.next() : endOfData();
+ }
+ }
+
+ public boolean isForThrift()
+ {
+ return controller.isForThrift();
+ }
+
+ public CFMetaData metadata()
+ {
+ return controller.metadata();
+ }
+
+ public void close()
+ {
+ FileUtils.closeQuietly(operationTree);
+ controller.finish();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/sa/ByteTerm.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/sa/ByteTerm.java b/src/java/org/apache/cassandra/index/sasi/sa/ByteTerm.java
new file mode 100644
index 0000000..c7bbab7
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/sa/ByteTerm.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.sa;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.index.sasi.disk.TokenTreeBuilder;
+import org.apache.cassandra.db.marshal.AbstractType;
+
+public class ByteTerm extends Term<ByteBuffer>
+{
+ public ByteTerm(int position, ByteBuffer value, TokenTreeBuilder tokens)
+ {
+ super(position, value, tokens);
+ }
+
+ public ByteBuffer getTerm()
+ {
+ return value.duplicate();
+ }
+
+ public ByteBuffer getSuffix(int start)
+ {
+ return (ByteBuffer) value.duplicate().position(value.position() + start);
+ }
+
+ public int compareTo(AbstractType<?> comparator, Term other)
+ {
+ return comparator.compare(value, (ByteBuffer) other.value);
+ }
+
+ public int length()
+ {
+ return value.remaining();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/sa/CharTerm.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/sa/CharTerm.java b/src/java/org/apache/cassandra/index/sasi/sa/CharTerm.java
new file mode 100644
index 0000000..533b566
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/sa/CharTerm.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.sa;
+
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+
+import org.apache.cassandra.index.sasi.disk.TokenTreeBuilder;
+import org.apache.cassandra.db.marshal.AbstractType;
+
+import com.google.common.base.Charsets;
+
+public class CharTerm extends Term<CharBuffer>
+{
+ public CharTerm(int position, CharBuffer value, TokenTreeBuilder tokens)
+ {
+ super(position, value, tokens);
+ }
+
+ public ByteBuffer getTerm()
+ {
+ return Charsets.UTF_8.encode(value.duplicate());
+ }
+
+ public ByteBuffer getSuffix(int start)
+ {
+ return Charsets.UTF_8.encode(value.subSequence(value.position() + start, value.remaining()));
+ }
+
+ public int compareTo(AbstractType<?> comparator, Term other)
+ {
+ return value.compareTo((CharBuffer) other.value);
+ }
+
+ public int length()
+ {
+ return value.length();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/sa/IntegralSA.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/sa/IntegralSA.java b/src/java/org/apache/cassandra/index/sasi/sa/IntegralSA.java
new file mode 100644
index 0000000..8356585
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/sa/IntegralSA.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.sa;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+
+import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder;
+import org.apache.cassandra.index.sasi.disk.TokenTreeBuilder;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.utils.Pair;
+
+public class IntegralSA extends SA<ByteBuffer>
+{
+ public IntegralSA(AbstractType<?> comparator, OnDiskIndexBuilder.Mode mode)
+ {
+ super(comparator, mode);
+ }
+
+ public Term<ByteBuffer> getTerm(ByteBuffer termValue, TokenTreeBuilder tokens)
+ {
+ return new ByteTerm(charCount, termValue, tokens);
+ }
+
+ public TermIterator finish()
+ {
+ return new IntegralSuffixIterator();
+ }
+
+
+ private class IntegralSuffixIterator extends TermIterator
+ {
+ private final Iterator<Term<ByteBuffer>> termIterator;
+
+ public IntegralSuffixIterator()
+ {
+ Collections.sort(terms, new Comparator<Term<?>>()
+ {
+ public int compare(Term<?> a, Term<?> b)
+ {
+ return a.compareTo(comparator, b);
+ }
+ });
+
+ termIterator = terms.iterator();
+ }
+
+ public ByteBuffer minTerm()
+ {
+ return terms.get(0).getTerm();
+ }
+
+ public ByteBuffer maxTerm()
+ {
+ return terms.get(terms.size() - 1).getTerm();
+ }
+
+ protected Pair<ByteBuffer, TokenTreeBuilder> computeNext()
+ {
+ if (!termIterator.hasNext())
+ return endOfData();
+
+ Term<ByteBuffer> term = termIterator.next();
+ return Pair.create(term.getTerm(), term.getTokens().finish());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/sa/SA.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/sa/SA.java b/src/java/org/apache/cassandra/index/sasi/sa/SA.java
new file mode 100644
index 0000000..75f9f92
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/sa/SA.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.sa;
+
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder.Mode;
+import org.apache.cassandra.index.sasi.disk.TokenTreeBuilder;
+import org.apache.cassandra.db.marshal.AbstractType;
+
+public abstract class SA<T extends Buffer>
+{
+ protected final AbstractType<?> comparator;
+ protected final Mode mode;
+
+ protected final List<Term<T>> terms = new ArrayList<>();
+ protected int charCount = 0;
+
+ public SA(AbstractType<?> comparator, Mode mode)
+ {
+ this.comparator = comparator;
+ this.mode = mode;
+ }
+
+ public Mode getMode()
+ {
+ return mode;
+ }
+
+ public void add(ByteBuffer termValue, TokenTreeBuilder tokens)
+ {
+ Term<T> term = getTerm(termValue, tokens);
+ terms.add(term);
+ charCount += term.length();
+ }
+
+ public abstract TermIterator finish();
+
+ protected abstract Term<T> getTerm(ByteBuffer termValue, TokenTreeBuilder tokens);
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/sa/SuffixSA.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/sa/SuffixSA.java b/src/java/org/apache/cassandra/index/sasi/sa/SuffixSA.java
new file mode 100644
index 0000000..63f6c5b
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/sa/SuffixSA.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.sa;
+
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+
+import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder;
+import org.apache.cassandra.index.sasi.disk.TokenTreeBuilder;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.utils.Pair;
+
+import com.google.common.base.Charsets;
+import net.mintern.primitive.Primitive;
+
+public class SuffixSA extends SA<CharBuffer>
+{
+ public SuffixSA(AbstractType<?> comparator, OnDiskIndexBuilder.Mode mode)
+ {
+ super(comparator, mode);
+ }
+
+ protected Term<CharBuffer> getTerm(ByteBuffer termValue, TokenTreeBuilder tokens)
+ {
+ return new CharTerm(charCount, Charsets.UTF_8.decode(termValue.duplicate()), tokens);
+ }
+
+ public TermIterator finish()
+ {
+ return new SASuffixIterator();
+ }
+
+ private class SASuffixIterator extends TermIterator
+ {
+ private final long[] suffixes;
+
+ private int current = 0;
+ private ByteBuffer lastProcessedSuffix;
+ private TokenTreeBuilder container;
+
+ public SASuffixIterator()
+ {
+ // each element has term index and char position encoded as two 32-bit integers
+ // to avoid binary search per suffix while sorting suffix array.
+ suffixes = new long[charCount];
+
+ long termIndex = -1, currentTermLength = -1;
+ for (int i = 0; i < charCount; i++)
+ {
+ if (i >= currentTermLength || currentTermLength == -1)
+ {
+ Term currentTerm = terms.get((int) ++termIndex);
+ currentTermLength = currentTerm.getPosition() + currentTerm.length();
+ }
+
+ suffixes[i] = (termIndex << 32) | i;
+ }
+
+ Primitive.sort(suffixes, (a, b) -> {
+ Term aTerm = terms.get((int) (a >>> 32));
+ Term bTerm = terms.get((int) (b >>> 32));
+ return comparator.compare(aTerm.getSuffix(((int) a) - aTerm.getPosition()),
+ bTerm.getSuffix(((int) b) - bTerm.getPosition()));
+ });
+ }
+
+ private Pair<ByteBuffer, TokenTreeBuilder> suffixAt(int position)
+ {
+ long index = suffixes[position];
+ Term term = terms.get((int) (index >>> 32));
+ return Pair.create(term.getSuffix(((int) index) - term.getPosition()), term.getTokens());
+ }
+
+ public ByteBuffer minTerm()
+ {
+ return suffixAt(0).left;
+ }
+
+ public ByteBuffer maxTerm()
+ {
+ return suffixAt(suffixes.length - 1).left;
+ }
+
+ protected Pair<ByteBuffer, TokenTreeBuilder> computeNext()
+ {
+ while (true)
+ {
+ if (current >= suffixes.length)
+ {
+ if (lastProcessedSuffix == null)
+ return endOfData();
+
+ Pair<ByteBuffer, TokenTreeBuilder> result = finishSuffix();
+
+ lastProcessedSuffix = null;
+ return result;
+ }
+
+ Pair<ByteBuffer, TokenTreeBuilder> suffix = suffixAt(current++);
+
+ if (lastProcessedSuffix == null)
+ {
+ lastProcessedSuffix = suffix.left;
+ container = new TokenTreeBuilder(suffix.right.getTokens());
+ }
+ else if (comparator.compare(lastProcessedSuffix, suffix.left) == 0)
+ {
+ lastProcessedSuffix = suffix.left;
+ container.add(suffix.right.getTokens());
+ }
+ else
+ {
+ Pair<ByteBuffer, TokenTreeBuilder> result = finishSuffix();
+
+ lastProcessedSuffix = suffix.left;
+ container = new TokenTreeBuilder(suffix.right.getTokens());
+
+ return result;
+ }
+ }
+ }
+
+ private Pair<ByteBuffer, TokenTreeBuilder> finishSuffix()
+ {
+ return Pair.create(lastProcessedSuffix, container.finish());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/sa/Term.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/sa/Term.java b/src/java/org/apache/cassandra/index/sasi/sa/Term.java
new file mode 100644
index 0000000..fe6eca8
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/sa/Term.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.sa;
+
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.index.sasi.disk.TokenTreeBuilder;
+import org.apache.cassandra.db.marshal.AbstractType;
+
+public abstract class Term<T extends Buffer>
+{
+ protected final int position;
+ protected final T value;
+ protected TokenTreeBuilder tokens;
+
+
+ public Term(int position, T value, TokenTreeBuilder tokens)
+ {
+ this.position = position;
+ this.value = value;
+ this.tokens = tokens;
+ }
+
+ public int getPosition()
+ {
+ return position;
+ }
+
+ public abstract ByteBuffer getTerm();
+ public abstract ByteBuffer getSuffix(int start);
+
+ public TokenTreeBuilder getTokens()
+ {
+ return tokens;
+ }
+
+ public abstract int compareTo(AbstractType<?> comparator, Term other);
+
+ public abstract int length();
+
+}
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/sa/TermIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/sa/TermIterator.java b/src/java/org/apache/cassandra/index/sasi/sa/TermIterator.java
new file mode 100644
index 0000000..916aa07
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/sa/TermIterator.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.sa;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.index.sasi.disk.TokenTreeBuilder;
+import org.apache.cassandra.utils.Pair;
+
+import com.google.common.collect.AbstractIterator;
+
+public abstract class TermIterator extends AbstractIterator<Pair<ByteBuffer, TokenTreeBuilder>>
+{
+ public abstract ByteBuffer minTerm();
+ public abstract ByteBuffer maxTerm();
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/utils/AbstractIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/utils/AbstractIterator.java b/src/java/org/apache/cassandra/index/sasi/utils/AbstractIterator.java
new file mode 100644
index 0000000..cf918c1
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/utils/AbstractIterator.java
@@ -0,0 +1,155 @@
+/*
+ * Copyright (C) 2007 The Guava Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.index.sasi.utils;
+
+import java.util.NoSuchElementException;
+
+import com.google.common.collect.PeekingIterator;
+
+import static com.google.common.base.Preconditions.checkState;
+
+// This is fork of the Guava AbstractIterator, the only difference
+// is that state & next variables are now protected, this was required
+// for SkippableIterator.skipTo(..) to void all previous state.
+public abstract class AbstractIterator<T> implements PeekingIterator<T>
+{
+ protected State state = State.NOT_READY;
+
+ /** Constructor for use by subclasses. */
+ protected AbstractIterator() {}
+
+ protected enum State
+ {
+ /** We have computed the next element and haven't returned it yet. */
+ READY,
+
+ /** We haven't yet computed or have already returned the element. */
+ NOT_READY,
+
+ /** We have reached the end of the data and are finished. */
+ DONE,
+
+ /** We've suffered an exception and are kaput. */
+ FAILED,
+ }
+
+ protected T next;
+
+ /**
+ * Returns the next element. <b>Note:</b> the implementation must call {@link
+ * #endOfData()} when there are no elements left in the iteration. Failure to
+ * do so could result in an infinite loop.
+ *
+ * <p>The initial invocation of {@link #hasNext()} or {@link #next()} calls
+ * this method, as does the first invocation of {@code hasNext} or {@code
+ * next} following each successful call to {@code next}. Once the
+ * implementation either invokes {@code endOfData} or throws an exception,
+ * {@code computeNext} is guaranteed to never be called again.
+ *
+ * <p>If this method throws an exception, it will propagate outward to the
+ * {@code hasNext} or {@code next} invocation that invoked this method. Any
+ * further attempts to use the iterator will result in an {@link
+ * IllegalStateException}.
+ *
+ * <p>The implementation of this method may not invoke the {@code hasNext},
+ * {@code next}, or {@link #peek()} methods on this instance; if it does, an
+ * {@code IllegalStateException} will result.
+ *
+ * @return the next element if there was one. If {@code endOfData} was called
+ * during execution, the return value will be ignored.
+ * @throws RuntimeException if any unrecoverable error happens. This exception
+ * will propagate outward to the {@code hasNext()}, {@code next()}, or
+ * {@code peek()} invocation that invoked this method. Any further
+ * attempts to use the iterator will result in an
+ * {@link IllegalStateException}.
+ */
+ protected abstract T computeNext();
+
+ /**
+ * Implementations of {@link #computeNext} <b>must</b> invoke this method when
+ * there are no elements left in the iteration.
+ *
+ * @return {@code null}; a convenience so your {@code computeNext}
+ * implementation can use the simple statement {@code return endOfData();}
+ */
+ protected final T endOfData()
+ {
+ state = State.DONE;
+ return null;
+ }
+
+ public final boolean hasNext()
+ {
+ checkState(state != State.FAILED);
+
+ switch (state)
+ {
+ case DONE:
+ return false;
+
+ case READY:
+ return true;
+
+ default:
+ }
+
+ return tryToComputeNext();
+ }
+
+ protected boolean tryToComputeNext()
+ {
+ state = State.FAILED; // temporary pessimism
+ next = computeNext();
+
+ if (state != State.DONE)
+ {
+ state = State.READY;
+ return true;
+ }
+
+ return false;
+ }
+
+ public final T next()
+ {
+ if (!hasNext())
+ throw new NoSuchElementException();
+
+ state = State.NOT_READY;
+ return next;
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Returns the next element in the iteration without advancing the iteration,
+ * according to the contract of {@link PeekingIterator#peek()}.
+ *
+ * <p>Implementations of {@code AbstractIterator} that wish to expose this
+ * functionality should implement {@code PeekingIterator}.
+ */
+ public final T peek()
+ {
+ if (!hasNext())
+ throw new NoSuchElementException();
+
+ return next;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/utils/CombinedTerm.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/utils/CombinedTerm.java b/src/java/org/apache/cassandra/index/sasi/utils/CombinedTerm.java
new file mode 100644
index 0000000..2bf5a07
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/utils/CombinedTerm.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.utils;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.cassandra.index.sasi.disk.OnDiskIndex.DataTerm;
+import org.apache.cassandra.index.sasi.disk.Token;
+import org.apache.cassandra.index.sasi.disk.TokenTree;
+import org.apache.cassandra.index.sasi.disk.TokenTreeBuilder;
+import org.apache.cassandra.db.marshal.AbstractType;
+
+import com.carrotsearch.hppc.LongOpenHashSet;
+import com.carrotsearch.hppc.LongSet;
+import com.carrotsearch.hppc.cursors.LongCursor;
+
+public class CombinedTerm implements CombinedValue<DataTerm>
+{
+ private final AbstractType<?> comparator;
+ private final DataTerm term;
+ private final TreeMap<Long, LongSet> tokens;
+
+ public CombinedTerm(AbstractType<?> comparator, DataTerm term)
+ {
+ this.comparator = comparator;
+ this.term = term;
+ this.tokens = new TreeMap<>();
+
+ RangeIterator<Long, Token> tokens = term.getTokens();
+ while (tokens.hasNext())
+ {
+ Token current = tokens.next();
+ LongSet offsets = this.tokens.get(current.get());
+ if (offsets == null)
+ this.tokens.put(current.get(), (offsets = new LongOpenHashSet()));
+
+ for (Long offset : ((TokenTree.OnDiskToken) current).getOffsets())
+ offsets.add(offset);
+ }
+ }
+
+ public ByteBuffer getTerm()
+ {
+ return term.getTerm();
+ }
+
+ public Map<Long, LongSet> getTokens()
+ {
+ return tokens;
+ }
+
+ public TokenTreeBuilder getTokenTreeBuilder()
+ {
+ return new TokenTreeBuilder(tokens).finish();
+ }
+
+ public void merge(CombinedValue<DataTerm> other)
+ {
+ if (!(other instanceof CombinedTerm))
+ return;
+
+ CombinedTerm o = (CombinedTerm) other;
+
+ assert comparator == o.comparator;
+
+ for (Map.Entry<Long, LongSet> token : o.tokens.entrySet())
+ {
+ LongSet offsets = this.tokens.get(token.getKey());
+ if (offsets == null)
+ this.tokens.put(token.getKey(), (offsets = new LongOpenHashSet()));
+
+ for (LongCursor offset : token.getValue())
+ offsets.add(offset.value);
+ }
+ }
+
+ public DataTerm get()
+ {
+ return term;
+ }
+
+ public int compareTo(CombinedValue<DataTerm> o)
+ {
+ return term.compareTo(comparator, o.get().getTerm());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/utils/CombinedTermIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/utils/CombinedTermIterator.java b/src/java/org/apache/cassandra/index/sasi/utils/CombinedTermIterator.java
new file mode 100644
index 0000000..06c27bf
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/utils/CombinedTermIterator.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.utils;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.index.sasi.disk.Descriptor;
+import org.apache.cassandra.index.sasi.disk.OnDiskIndex;
+import org.apache.cassandra.index.sasi.disk.TokenTreeBuilder;
+import org.apache.cassandra.index.sasi.sa.TermIterator;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.utils.Pair;
+
+public class CombinedTermIterator extends TermIterator
+{
+ final Descriptor descriptor;
+ final RangeIterator<OnDiskIndex.DataTerm, CombinedTerm> union;
+ final ByteBuffer min;
+ final ByteBuffer max;
+
+ public CombinedTermIterator(OnDiskIndex... sas)
+ {
+ this(Descriptor.CURRENT, sas);
+ }
+
+ public CombinedTermIterator(Descriptor d, OnDiskIndex... parts)
+ {
+ descriptor = d;
+ union = OnDiskIndexIterator.union(parts);
+
+ AbstractType<?> comparator = parts[0].getComparator(); // assumes all SAs have same comparator
+ ByteBuffer minimum = parts[0].minTerm();
+ ByteBuffer maximum = parts[0].maxTerm();
+
+ for (int i = 1; i < parts.length; i++)
+ {
+ OnDiskIndex part = parts[i];
+ if (part == null)
+ continue;
+
+ minimum = comparator.compare(minimum, part.minTerm()) > 0 ? part.minTerm() : minimum;
+ maximum = comparator.compare(maximum, part.maxTerm()) < 0 ? part.maxTerm() : maximum;
+ }
+
+ min = minimum;
+ max = maximum;
+ }
+
+ public ByteBuffer minTerm()
+ {
+ return min;
+ }
+
+ public ByteBuffer maxTerm()
+ {
+ return max;
+ }
+
+ protected Pair<ByteBuffer, TokenTreeBuilder> computeNext()
+ {
+ if (!union.hasNext())
+ {
+ return endOfData();
+ }
+ else
+ {
+ CombinedTerm term = union.next();
+ return Pair.create(term.getTerm(), term.getTokenTreeBuilder());
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/utils/CombinedValue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/utils/CombinedValue.java b/src/java/org/apache/cassandra/index/sasi/utils/CombinedValue.java
new file mode 100644
index 0000000..ca5f9be
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/utils/CombinedValue.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.utils;
+
+public interface CombinedValue<V> extends Comparable<CombinedValue<V>>
+{
+ void merge(CombinedValue<V> other);
+
+ V get();
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/utils/MappedBuffer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/utils/MappedBuffer.java b/src/java/org/apache/cassandra/index/sasi/utils/MappedBuffer.java
new file mode 100644
index 0000000..37ab1be
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/utils/MappedBuffer.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.utils;
+
+import java.io.Closeable;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel.MapMode;
+
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.io.util.ChannelProxy;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.RandomAccessReader;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class MappedBuffer implements Closeable
+{
+ private final MappedByteBuffer[] pages;
+
+ private long position, limit;
+ private final long capacity;
+ private final int pageSize, sizeBits;
+
+ private MappedBuffer(MappedBuffer other)
+ {
+ this.sizeBits = other.sizeBits;
+ this.pageSize = other.pageSize;
+ this.position = other.position;
+ this.limit = other.limit;
+ this.capacity = other.capacity;
+ this.pages = other.pages;
+ }
+
+ public MappedBuffer(RandomAccessReader file)
+ {
+ this(file.getChannel(), 30);
+ }
+
+ public MappedBuffer(ChannelProxy file)
+ {
+ this(file, 30);
+ }
+
+ @VisibleForTesting
+ protected MappedBuffer(ChannelProxy file, int numPageBits)
+ {
+ if (numPageBits > Integer.SIZE - 1)
+ throw new IllegalArgumentException("page size can't be bigger than 1G");
+
+ sizeBits = numPageBits;
+ pageSize = 1 << sizeBits;
+ position = 0;
+ limit = capacity = file.size();
+ pages = new MappedByteBuffer[(int) (file.size() / pageSize) + 1];
+
+ try
+ {
+ long offset = 0;
+ for (int i = 0; i < pages.length; i++)
+ {
+ long pageSize = Math.min(this.pageSize, (capacity - offset));
+ pages[i] = file.map(MapMode.READ_ONLY, offset, pageSize);
+ offset += pageSize;
+ }
+ }
+ finally
+ {
+ file.close();
+ }
+ }
+
+ public int comparePageTo(long offset, int length, AbstractType<?> comparator, ByteBuffer other)
+ {
+ return comparator.compare(getPageRegion(offset, length), other);
+ }
+
+ public long capacity()
+ {
+ return capacity;
+ }
+
+ public long position()
+ {
+ return position;
+ }
+
+ public MappedBuffer position(long newPosition)
+ {
+ if (newPosition < 0 || newPosition > limit)
+ throw new IllegalArgumentException("position: " + newPosition + ", limit: " + limit);
+
+ position = newPosition;
+ return this;
+ }
+
+ public long limit()
+ {
+ return limit;
+ }
+
+ public MappedBuffer limit(long newLimit)
+ {
+ if (newLimit < position || newLimit > capacity)
+ throw new IllegalArgumentException();
+
+ limit = newLimit;
+ return this;
+ }
+
+ public long remaining()
+ {
+ return limit - position;
+ }
+
+ public boolean hasRemaining()
+ {
+ return remaining() > 0;
+ }
+
+ public byte get()
+ {
+ return get(position++);
+ }
+
+ public byte get(long pos)
+ {
+ return pages[getPage(pos)].get(getPageOffset(pos));
+ }
+
+ public short getShort()
+ {
+ short value = getShort(position);
+ position += 2;
+ return value;
+ }
+
+ public short getShort(long pos)
+ {
+ if (isPageAligned(pos, 2))
+ return pages[getPage(pos)].getShort(getPageOffset(pos));
+
+ int ch1 = get(pos) & 0xff;
+ int ch2 = get(pos + 1) & 0xff;
+ return (short) ((ch1 << 8) + ch2);
+ }
+
+ public int getInt()
+ {
+ int value = getInt(position);
+ position += 4;
+ return value;
+ }
+
+ public int getInt(long pos)
+ {
+ if (isPageAligned(pos, 4))
+ return pages[getPage(pos)].getInt(getPageOffset(pos));
+
+ int ch1 = get(pos) & 0xff;
+ int ch2 = get(pos + 1) & 0xff;
+ int ch3 = get(pos + 2) & 0xff;
+ int ch4 = get(pos + 3) & 0xff;
+
+ return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + ch4);
+ }
+
+ public long getLong()
+ {
+ long value = getLong(position);
+ position += 8;
+ return value;
+ }
+
+
+ public long getLong(long pos)
+ {
+ // fast path if the long could be retrieved from a single page
+ // that would avoid multiple expensive look-ups into page array.
+ return (isPageAligned(pos, 8))
+ ? pages[getPage(pos)].getLong(getPageOffset(pos))
+ : ((long) (getInt(pos)) << 32) + (getInt(pos + 4) & 0xFFFFFFFFL);
+ }
+
+ public ByteBuffer getPageRegion(long position, int length)
+ {
+ if (!isPageAligned(position, length))
+ throw new IllegalArgumentException(String.format("range: %s-%s wraps more than one page", position, length));
+
+ ByteBuffer slice = pages[getPage(position)].duplicate();
+
+ int pageOffset = getPageOffset(position);
+ slice.position(pageOffset).limit(pageOffset + length);
+
+ return slice;
+ }
+
+ public MappedBuffer duplicate()
+ {
+ return new MappedBuffer(this);
+ }
+
+ public void close()
+ {
+ if (!FileUtils.isCleanerAvailable())
+ return;
+
+ /*
+ * Try forcing the unmapping of pages using undocumented unsafe sun APIs.
+ * If this fails (non Sun JVM), we'll have to wait for the GC to finalize the mapping.
+ * If this works and a thread tries to access any page, hell will unleash on earth.
+ */
+ try
+ {
+ for (MappedByteBuffer segment : pages)
+ FileUtils.clean(segment);
+ }
+ catch (Exception e)
+ {
+ // This is not supposed to happen
+ }
+ }
+
+ private int getPage(long position)
+ {
+ return (int) (position >> sizeBits);
+ }
+
+ private int getPageOffset(long position)
+ {
+ return (int) (position & pageSize - 1);
+ }
+
+ private boolean isPageAligned(long position, int length)
+ {
+ return pageSize - (getPageOffset(position) + length) > 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/utils/OnDiskIndexIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/utils/OnDiskIndexIterator.java b/src/java/org/apache/cassandra/index/sasi/utils/OnDiskIndexIterator.java
new file mode 100644
index 0000000..ae97cab
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/utils/OnDiskIndexIterator.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.utils;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.cassandra.index.sasi.disk.OnDiskIndex;
+import org.apache.cassandra.index.sasi.disk.OnDiskIndex.DataTerm;
+import org.apache.cassandra.db.marshal.AbstractType;
+
+public class OnDiskIndexIterator extends RangeIterator<DataTerm, CombinedTerm>
+{
+ private final AbstractType<?> comparator;
+ private final Iterator<DataTerm> terms;
+
+ public OnDiskIndexIterator(OnDiskIndex index)
+ {
+ super(index.min(), index.max(), Long.MAX_VALUE);
+
+ this.comparator = index.getComparator();
+ this.terms = index.iterator();
+ }
+
+ public static RangeIterator<DataTerm, CombinedTerm> union(OnDiskIndex... union)
+ {
+ RangeUnionIterator.Builder<DataTerm, CombinedTerm> builder = RangeUnionIterator.builder();
+ for (OnDiskIndex e : union)
+ {
+ if (e != null)
+ builder.add(new OnDiskIndexIterator(e));
+ }
+
+ return builder.build();
+ }
+
+ protected CombinedTerm computeNext()
+ {
+ return terms.hasNext() ? new CombinedTerm(comparator, terms.next()) : endOfData();
+ }
+
+ protected void performSkipTo(DataTerm nextToken)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void close() throws IOException
+ {}
+}