You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by xe...@apache.org on 2016/01/24 04:36:11 UTC

[09/14] cassandra git commit: Integrate SASI index into Cassandra

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/plan/Operation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/plan/Operation.java b/src/java/org/apache/cassandra/index/sasi/plan/Operation.java
new file mode 100644
index 0000000..1857c56
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/plan/Operation.java
@@ -0,0 +1,477 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.plan;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.ColumnDefinition.Kind;
+import org.apache.cassandra.cql3.Operator;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.Unfiltered;
+import org.apache.cassandra.index.sasi.conf.ColumnIndex;
+import org.apache.cassandra.index.sasi.analyzer.AbstractAnalyzer;
+import org.apache.cassandra.index.sasi.disk.Token;
+import org.apache.cassandra.index.sasi.plan.Expression.Op;
+import org.apache.cassandra.index.sasi.utils.RangeIntersectionIterator;
+import org.apache.cassandra.index.sasi.utils.RangeIterator;
+import org.apache.cassandra.index.sasi.utils.RangeUnionIterator;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.*;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class Operation extends RangeIterator<Long, Token>
+{
+    public enum OperationType
+    {
+        AND, OR;
+
+        public boolean apply(boolean a, boolean b)
+        {
+            switch (this)
+            {
+                case OR:
+                    return a | b;
+
+                case AND:
+                    return a & b;
+
+                default:
+                    throw new AssertionError();
+            }
+        }
+    }
+
+    private final QueryController controller;
+
+    protected final OperationType op;
+    protected final ListMultimap<ColumnDefinition, Expression> expressions;
+    protected final RangeIterator<Long, Token> range;
+
+    protected Operation left, right;
+
+    private Operation(OperationType operation,
+                      QueryController controller,
+                      ListMultimap<ColumnDefinition, Expression> expressions,
+                      RangeIterator<Long, Token> range,
+                      Operation left, Operation right)
+    {
+        super(range);
+
+        this.op = operation;
+        this.controller = controller;
+        this.expressions = expressions;
+        this.range = range;
+
+        this.left = left;
+        this.right = right;
+    }
+
+    /**
+     * Recursive "satisfies" checks based on operation
+     * and data from the lower level members using depth-first search
+     * and bubbling the results back to the top level caller.
+     *
+     * Most of the work here is done by {@link #localSatisfiedBy(Unfiltered, boolean)}
+     * see it's comment for details, if there are no local expressions
+     * assigned to Operation it will call satisfiedBy(Row) on it's children.
+     *
+     * Query: first_name = X AND (last_name = Y OR address = XYZ AND street = IL AND city = C) OR (state = 'CA' AND country = 'US')
+     * Row: key1: (first_name: X, last_name: Z, address: XYZ, street: IL, city: C, state: NY, country:US)
+     *
+     * #1                       OR
+     *                        /    \
+     * #2       (first_name) AND   AND (state, country)
+     *                          \
+     * #3            (last_name) OR
+     *                             \
+     * #4                          AND (address, street, city)
+     *
+     *
+     * Evaluation of the key1 is top-down depth-first search:
+     *
+     * --- going down ---
+     * Level #1 is evaluated, OR expression has to pull results from it's children which are at level #2 and OR them together,
+     * Level #2 AND (state, country) could be be evaluated right away, AND (first_name) refers to it's "right" child from level #3
+     * Level #3 OR (last_name) requests results from level #4
+     * Level #4 AND (address, street, city) does logical AND between it's 3 fields, returns result back to level #3.
+     * --- bubbling up ---
+     * Level #3 computes OR between AND (address, street, city) result and it's "last_name" expression
+     * Level #2 computes AND between "first_name" and result of level #3, AND (state, country) which is already computed
+     * Level #1 does OR between results of AND (first_name) and AND (state, country) and returns final result.
+     *
+     * @param row The row to check.
+     * @return true if give Row satisfied all of the expressions in the tree,
+     *         false otherwise.
+     */
+    public boolean satisfiedBy(Unfiltered row, boolean allowMissingColumns)
+    {
+        boolean sideL, sideR;
+
+        if (expressions == null || expressions.isEmpty())
+        {
+            sideL =  left != null &&  left.satisfiedBy(row, allowMissingColumns);
+            sideR = right != null && right.satisfiedBy(row, allowMissingColumns);
+
+            // one of the expressions was skipped
+            // because it had no indexes attached
+            if (left == null)
+                return sideR;
+        }
+        else
+        {
+            sideL = localSatisfiedBy(row, allowMissingColumns);
+
+            // if there is no right it means that this expression
+            // is last in the sequence, we can just return result from local expressions
+            if (right == null)
+                return sideL;
+
+            sideR = right.satisfiedBy(row, allowMissingColumns);
+        }
+
+
+        return op.apply(sideL, sideR);
+    }
+
+    /**
+     * Check every expression in the analyzed list to figure out if the
+     * columns in the give row match all of the based on the operation
+     * set to the current operation node.
+     *
+     * The algorithm is as follows: for every given expression from analyzed
+     * list get corresponding column from the Row:
+     *   - apply {@link Expression#contains(ByteBuffer)}
+     *     method to figure out if it's satisfied;
+     *   - apply logical operation between boolean accumulator and current boolean result;
+     *   - if result == false and node's operation is AND return right away;
+     *
+     * After all of the expressions have been evaluated return resulting accumulator variable.
+     *
+     * Example:
+     *
+     * Operation = (op: AND, columns: [first_name = p, 5 < age < 7, last_name: y])
+     * Row = (first_name: pavel, last_name: y, age: 6, timestamp: 15)
+     *
+     * #1 get "first_name" = p (expressions)
+     *      - row-get "first_name"                      => "pavel"
+     *      - compare "pavel" against "p"               => true (current)
+     *      - set accumulator current                   => true (because this is expression #1)
+     *
+     * #2 get "last_name" = y (expressions)
+     *      - row-get "last_name"                       => "y"
+     *      - compare "y" against "y"                   => true (current)
+     *      - set accumulator to accumulator & current  => true
+     *
+     * #3 get 5 < "age" < 7 (expressions)
+     *      - row-get "age"                             => "6"
+     *      - compare 5 < 6 < 7                         => true (current)
+     *      - set accumulator to accumulator & current  => true
+     *
+     * #4 return accumulator => true (row satisfied all of the conditions)
+     *
+     * @param row The row to check.
+     * @return true if give Row satisfied all of the analyzed expressions,
+     *         false otherwise.
+     */
+    private boolean localSatisfiedBy(Unfiltered row, boolean allowMissingColumns)
+    {
+        if (row == null || !row.isRow())
+            return false;
+
+        final int now = FBUtilities.nowInSeconds();
+        boolean result = false;
+        int idx = 0;
+
+        for (ColumnDefinition column : expressions.keySet())
+        {
+            if (column.kind == Kind.PARTITION_KEY)
+                continue;
+
+            ByteBuffer value = ColumnIndex.getValueOf(column, (Row) row, now);
+            boolean isMissingColumn = value == null;
+
+            if (!allowMissingColumns && isMissingColumn)
+                throw new IllegalStateException("All indexed columns should be included into the column slice, missing: " + column);
+
+            boolean isMatch = false;
+            // If there is a column with multiple expressions that effectively means an OR
+            // e.g. comment = 'x y z' could be split into 'comment' EQ 'x', 'comment' EQ 'y', 'comment' EQ 'z'
+            // by analyzer, in situation like that we only need to check if at least one of expressions matches,
+            // and there is no hit on the NOT_EQ (if any) which are always at the end of the filter list.
+            // Loop always starts from the end of the list, which makes it possible to break after the last
+            // NOT_EQ condition on first EQ/RANGE condition satisfied, instead of checking every
+            // single expression in the column filter list.
+            List<Expression> filters = expressions.get(column);
+            for (int i = filters.size() - 1; i >= 0; i--)
+            {
+                Expression expression = filters.get(i);
+                isMatch = !isMissingColumn && expression.contains(value);
+                if (expression.getOp() == Op.NOT_EQ)
+                {
+                    // since this is NOT_EQ operation we have to
+                    // inverse match flag (to check against other expressions),
+                    // and break in case of negative inverse because that means
+                    // that it's a positive hit on the not-eq clause.
+                    isMatch = !isMatch;
+                    if (!isMatch)
+                        break;
+                } // if it was a match on EQ/RANGE or column is missing
+                else if (isMatch || isMissingColumn)
+                    break;
+            }
+
+            if (idx++ == 0)
+            {
+                result = isMatch;
+                continue;
+            }
+
+            result = op.apply(result, isMatch);
+
+            // exit early because we already got a single false
+            if (op == OperationType.AND && !result)
+                return false;
+        }
+
+        return idx == 0 || result;
+    }
+
+    @VisibleForTesting
+    protected static ListMultimap<ColumnDefinition, Expression> analyzeGroup(QueryController controller,
+                                                                             OperationType op,
+                                                                             List<RowFilter.Expression> expressions)
+    {
+        ListMultimap<ColumnDefinition, Expression> analyzed = ArrayListMultimap.create();
+
+        // sort all of the expressions in the operation by name and priority of the logical operator
+        // this gives us an efficient way to handle inequality and combining into ranges without extra processing
+        // and converting expressions from one type to another.
+        Collections.sort(expressions, (a, b) -> {
+            int cmp = a.column().compareTo(b.column());
+            return cmp == 0 ? -Integer.compare(getPriority(a.operator()), getPriority(b.operator())) : cmp;
+        });
+
+        for (final RowFilter.Expression e : expressions)
+        {
+            ColumnIndex columnIndex = controller.getIndex(e);
+            List<Expression> perColumn = analyzed.get(e.column());
+
+            if (columnIndex == null)
+                columnIndex = new ColumnIndex(controller.getKeyValidator(), e.column(), null);
+
+            AbstractAnalyzer analyzer = columnIndex.getAnalyzer();
+            analyzer.reset(e.getIndexValue());
+
+            // EQ/NOT_EQ can have multiple expressions e.g. text = "Hello World",
+            // becomes text = "Hello" OR text = "World" because "space" is always interpreted as a split point (by analyzer),
+            // NOT_EQ is made an independent expression only in case of pre-existing multiple EQ expressions, or
+            // if there is no EQ operations and NOT_EQ is met or a single NOT_EQ expression present,
+            // in such case we know exactly that there would be no more EQ/RANGE expressions for given column
+            // since NOT_EQ has the lowest priority.
+            if (e.operator() == Operator.EQ
+                    || (e.operator() == Operator.NEQ
+                       && (perColumn.size() == 0 || perColumn.size() > 1
+                           || (perColumn.size() == 1 && perColumn.get(0).getOp() == Op.NOT_EQ))))
+            {
+                while (analyzer.hasNext())
+                {
+                    final ByteBuffer token = analyzer.next();
+                    perColumn.add(new Expression(controller, columnIndex).add(e.operator(), token));
+                }
+            }
+            else
+            // "range" or not-equals operator, combines both bounds together into the single expression,
+            // iff operation of the group is AND, otherwise we are forced to create separate expressions,
+            // not-equals is combined with the range iff operator is AND.
+            {
+                Expression range;
+                if (perColumn.size() == 0 || op != OperationType.AND)
+                    perColumn.add((range = new Expression(controller, columnIndex)));
+                else
+                    range = Iterables.getLast(perColumn);
+
+                while (analyzer.hasNext())
+                    range.add(e.operator(), analyzer.next());
+            }
+        }
+
+        return analyzed;
+    }
+
+    private static int getPriority(Operator op)
+    {
+        switch (op)
+        {
+            case EQ:
+                return 4;
+
+            case GTE:
+            case GT:
+                return 3;
+
+            case LTE:
+            case LT:
+                return 2;
+
+            case NEQ:
+                return 1;
+
+            default:
+                return 0;
+        }
+    }
+
+    protected Token computeNext()
+    {
+        return range != null && range.hasNext() ? range.next() : endOfData();
+    }
+
+    protected void performSkipTo(Long nextToken)
+    {
+        if (range != null)
+            range.skipTo(nextToken);
+    }
+
+    public void close() throws IOException
+    {
+        controller.releaseIndexes(this);
+    }
+
+    public static class Builder
+    {
+        private final QueryController controller;
+
+        protected final OperationType op;
+        protected final List<RowFilter.Expression> expressions;
+
+        protected Builder left, right;
+
+        public Builder(OperationType operation, QueryController controller, RowFilter.Expression... columns)
+        {
+            this.op = operation;
+            this.controller = controller;
+            this.expressions = new ArrayList<>();
+            Collections.addAll(expressions, columns);
+        }
+
+        public Builder setRight(Builder operation)
+        {
+            this.right = operation;
+            return this;
+        }
+
+        public Builder setLeft(Builder operation)
+        {
+            this.left = operation;
+            return this;
+        }
+
+        public void add(RowFilter.Expression e)
+        {
+            expressions.add(e);
+        }
+
+        public void add(Collection<RowFilter.Expression> newExpressions)
+        {
+            if (expressions != null)
+                expressions.addAll(newExpressions);
+        }
+
+        public Operation complete()
+        {
+            if (!expressions.isEmpty())
+            {
+                ListMultimap<ColumnDefinition, Expression> analyzedExpressions = analyzeGroup(controller, op, expressions);
+                RangeIterator.Builder<Long, Token> range = controller.getIndexes(op, analyzedExpressions.values());
+
+                Operation rightOp = null;
+                if (right != null)
+                {
+                    rightOp = right.complete();
+                    range.add(rightOp);
+                }
+
+                return new Operation(op, controller, analyzedExpressions, range.build(), null, rightOp);
+            }
+            else
+            {
+                Operation leftOp = null, rightOp = null;
+                boolean leftIndexes = false, rightIndexes = false;
+
+                if (left != null)
+                {
+                    leftOp = left.complete();
+                    leftIndexes = leftOp != null && leftOp.range != null;
+                }
+
+                if (right != null)
+                {
+                    rightOp = right.complete();
+                    rightIndexes = rightOp != null && rightOp.range != null;
+                }
+
+                RangeIterator<Long, Token> join;
+                /**
+                 * Operation should allow one of it's sub-trees to wrap no indexes, that is related  to the fact that we
+                 * have to accept defined-but-not-indexed columns as well as key range as IndexExpressions.
+                 *
+                 * Two cases are possible:
+                 *
+                 * only left child produced indexed iterators, that could happen when there are two columns
+                 * or key range on the right:
+                 *
+                 *                AND
+                 *              /     \
+                 *            OR       \
+                 *           /   \     AND
+                 *          a     b   /   \
+                 *                  key   key
+                 *
+                 * only right child produced indexed iterators:
+                 *
+                 *               AND
+                 *              /    \
+                 *            AND     a
+                 *           /   \
+                 *         key  key
+                 */
+                if (leftIndexes && !rightIndexes)
+                    join = leftOp;
+                else if (!leftIndexes && rightIndexes)
+                    join = rightOp;
+                else if (leftIndexes)
+                {
+                    RangeIterator.Builder<Long, Token> builder = op == OperationType.OR
+                                                ? RangeUnionIterator.<Long, Token>builder()
+                                                : RangeIntersectionIterator.<Long, Token>builder();
+
+                    join = builder.add(leftOp).add(rightOp).build();
+                }
+                else
+                    throw new AssertionError("both sub-trees have 0 indexes.");
+
+                return new Operation(op, controller, null, join, leftOp, rightOp);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java b/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java
new file mode 100644
index 0000000..8e10fd0
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.plan;
+
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.ColumnFamilyStore.RefViewFragment;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.lifecycle.SSTableSet;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.index.Index;
+import org.apache.cassandra.index.sasi.SASIIndex;
+import org.apache.cassandra.index.sasi.SSTableIndex;
+import org.apache.cassandra.index.sasi.TermIterator;
+import org.apache.cassandra.index.sasi.conf.ColumnIndex;
+import org.apache.cassandra.index.sasi.conf.view.View;
+import org.apache.cassandra.index.sasi.disk.Token;
+import org.apache.cassandra.index.sasi.exceptions.TimeQuotaExceededException;
+import org.apache.cassandra.index.sasi.plan.Operation.OperationType;
+import org.apache.cassandra.index.sasi.utils.RangeIntersectionIterator;
+import org.apache.cassandra.index.sasi.utils.RangeIterator;
+import org.apache.cassandra.index.sasi.utils.RangeUnionIterator;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.Pair;
+
+public class QueryController
+{
+    private final long executionQuota;
+    private final long executionStart;
+
+    private final ColumnFamilyStore cfs;
+    private final PartitionRangeReadCommand command;
+    private final Map<Collection<Expression>, List<RangeIterator<Long, Token>>> resources = new HashMap<>();
+    private final RefViewFragment scope;
+    private final Set<SSTableReader> sstables;
+
+    public QueryController(ColumnFamilyStore cfs, PartitionRangeReadCommand command, long timeQuotaMs)
+    {
+        this.cfs = cfs;
+        this.command = command;
+        this.executionQuota = TimeUnit.MILLISECONDS.toNanos(timeQuotaMs);
+        this.executionStart = System.nanoTime();
+        this.scope = getSSTableScope(cfs, command);
+        this.sstables = new HashSet<>(scope.sstables);
+    }
+
+    public boolean isForThrift()
+    {
+        return command.isForThrift();
+    }
+
+    public CFMetaData metadata()
+    {
+        return command.metadata();
+    }
+
+    public Collection<RowFilter.Expression> getExpressions()
+    {
+        return command.rowFilter().getExpressions();
+    }
+
+    public DataRange dataRange()
+    {
+        return command.dataRange();
+    }
+
+    public AbstractType<?> getKeyValidator()
+    {
+        return cfs.metadata.getKeyValidator();
+    }
+
+    public ColumnIndex getIndex(RowFilter.Expression expression)
+    {
+        Optional<Index> index = cfs.indexManager.getBestIndexFor(expression);
+        return index.isPresent() ? ((SASIIndex) index.get()).getIndex() : null;
+    }
+
+
+    public UnfilteredRowIterator getPartition(DecoratedKey key, ReadExecutionController executionController)
+    {
+        if (key == null)
+            throw new NullPointerException();
+        try
+        {
+            SinglePartitionReadCommand partition = SinglePartitionReadCommand.create(command.isForThrift(),
+                                                                                     cfs.metadata,
+                                                                                     command.nowInSec(),
+                                                                                     command.columnFilter(),
+                                                                                     command.rowFilter().withoutExpressions(),
+                                                                                     DataLimits.NONE,
+                                                                                     key,
+                                                                                     command.clusteringIndexFilter(key));
+
+            return partition.queryMemtableAndDisk(cfs, executionController.baseReadOpOrderGroup());
+        }
+        finally
+        {
+            checkpoint();
+        }
+    }
+
+    /**
+     * Build a range iterator from the given list of expressions by applying given operation (OR/AND).
+     * Building of such iterator involves index search, results of which are persisted in the internal resources list
+     * and can be released later via {@link QueryController#releaseIndexes(Operation)}.
+     *
+     * @param op The operation type to coalesce expressions with.
+     * @param expressions The expressions to build range iterator from (expressions with not results are ignored).
+     *
+     * @return The range builder based on given expressions and operation type.
+     */
+    public RangeIterator.Builder<Long, Token> getIndexes(OperationType op, Collection<Expression> expressions)
+    {
+        if (resources.containsKey(expressions))
+            throw new IllegalArgumentException("Can't process the same expressions multiple times.");
+
+        RangeIterator.Builder<Long, Token> builder = op == OperationType.OR
+                                                ? RangeUnionIterator.<Long, Token>builder()
+                                                : RangeIntersectionIterator.<Long, Token>builder();
+
+        List<RangeIterator<Long, Token>> perIndexUnions = new ArrayList<>();
+
+        for (Map.Entry<Expression, Set<SSTableIndex>> e : getView(op, expressions).entrySet())
+        {
+            RangeIterator<Long, Token> index = TermIterator.build(e.getKey(), e.getValue());
+
+            if (index == null)
+                continue;
+
+            builder.add(index);
+            perIndexUnions.add(index);
+        }
+
+        resources.put(expressions, perIndexUnions);
+        return builder;
+    }
+
+    public void checkpoint()
+    {
+        if ((System.nanoTime() - executionStart) >= executionQuota)
+            throw new TimeQuotaExceededException();
+    }
+
+    public void releaseIndexes(Operation operation)
+    {
+        if (operation.expressions != null)
+            releaseIndexes(resources.remove(operation.expressions.values()));
+    }
+
+    private void releaseIndexes(List<RangeIterator<Long, Token>> indexes)
+    {
+        if (indexes == null)
+            return;
+
+        indexes.forEach(FileUtils::closeQuietly);
+    }
+
+    public void finish()
+    {
+        try
+        {
+            resources.values().forEach(this::releaseIndexes);
+        }
+        finally
+        {
+            scope.release();
+        }
+    }
+
+    private Map<Expression, Set<SSTableIndex>> getView(OperationType op, Collection<Expression> expressions)
+    {
+        // first let's determine the primary expression if op is AND
+        Pair<Expression, Set<SSTableIndex>> primary = (op == OperationType.AND) ? calculatePrimary(expressions) : null;
+
+        Map<Expression, Set<SSTableIndex>> indexes = new HashMap<>();
+        for (Expression e : expressions)
+        {
+            // NO_EQ and non-index column query should only act as FILTER BY for satisfiedBy(Row) method
+            // because otherwise it likely to go through the whole index.
+            if (!e.isIndexed() || e.getOp() == Expression.Op.NOT_EQ)
+                continue;
+
+            // primary expression, we'll have to add as is
+            if (primary != null && e.equals(primary.left))
+            {
+                indexes.put(primary.left, primary.right);
+                continue;
+            }
+
+            View view = e.index.getView();
+            if (view == null)
+                continue;
+
+            Set<SSTableIndex> readers = new HashSet<>();
+            if (primary != null && primary.right.size() > 0)
+            {
+                for (SSTableIndex index : primary.right)
+                    readers.addAll(view.match(index.minKey(), index.maxKey()));
+            }
+            else
+            {
+                readers.addAll(view.match(sstables, e));
+            }
+
+            indexes.put(e, readers);
+        }
+
+        return indexes;
+    }
+
+    private Pair<Expression, Set<SSTableIndex>> calculatePrimary(Collection<Expression> expressions)
+    {
+        Expression expression = null;
+        Set<SSTableIndex> primaryIndexes = Collections.emptySet();
+
+        for (Expression e : expressions)
+        {
+            if (!e.isIndexed())
+                continue;
+
+            View view = e.index.getView();
+            if (view == null)
+                continue;
+
+            Set<SSTableIndex> indexes = view.match(sstables, e);
+            if (primaryIndexes.size() > indexes.size())
+            {
+                primaryIndexes = indexes;
+                expression = e;
+            }
+        }
+
+        return expression == null ? null : Pair.create(expression, primaryIndexes);
+    }
+
+    private static RefViewFragment getSSTableScope(ColumnFamilyStore cfs, PartitionRangeReadCommand command)
+    {
+        return cfs.selectAndReference(org.apache.cassandra.db.lifecycle.View.select(SSTableSet.CANONICAL, command.dataRange().keyRange()));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java b/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java
new file mode 100644
index 0000000..d34b05a
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.plan;
+
+import java.util.*;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.index.sasi.disk.Token;
+import org.apache.cassandra.index.sasi.plan.Operation.OperationType;
+import org.apache.cassandra.exceptions.RequestTimeoutException;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.AbstractIterator;
+
+public class QueryPlan
+{
+    private final QueryController controller;
+
+    public QueryPlan(ColumnFamilyStore cfs, ReadCommand command, long executionQuotaMs)
+    {
+        this.controller = new QueryController(cfs, (PartitionRangeReadCommand) command, executionQuotaMs);
+    }
+
+    /**
+     * Converts expressions into operation tree (which is currently just a single AND).
+     *
+     * Operation tree allows us to do a couple of important optimizations
+     * namely, group flattening for AND operations (query rewrite), expression bounds checks,
+     * "satisfies by" checks for resulting rows with an early exit.
+     *
+     * @return root of the operations tree.
+     */
+    private Operation analyze()
+    {
+        try
+        {
+            Operation.Builder and = new Operation.Builder(OperationType.AND, controller);
+            controller.getExpressions().forEach(and::add);
+            return and.complete();
+        }
+        catch (Exception | Error e)
+        {
+            controller.finish();
+            throw e;
+        }
+    }
+
+    public UnfilteredPartitionIterator execute(ReadExecutionController executionController) throws RequestTimeoutException
+    {
+        return new ResultIterator(analyze(), controller, executionController);
+    }
+
+    private static class ResultIterator extends AbstractIterator<UnfilteredRowIterator> implements UnfilteredPartitionIterator
+    {
+        private final AbstractBounds<PartitionPosition> keyRange;
+        private final Operation operationTree;
+        private final QueryController controller;
+        private final ReadExecutionController executionController;
+
+        private Iterator<DecoratedKey> currentKeys = null;
+
+        public ResultIterator(Operation operationTree, QueryController controller, ReadExecutionController executionController)
+        {
+            this.keyRange = controller.dataRange().keyRange();
+            this.operationTree = operationTree;
+            this.controller = controller;
+            this.executionController = executionController;
+            if (operationTree != null)
+                operationTree.skipTo((Long) keyRange.left.getToken().getTokenValue());
+        }
+
+        protected UnfilteredRowIterator computeNext()
+        {
+            if (operationTree == null)
+                return endOfData();
+
+            for (;;)
+            {
+                if (currentKeys == null || !currentKeys.hasNext())
+                {
+                    if (!operationTree.hasNext())
+                         return endOfData();
+
+                    Token token = operationTree.next();
+                    currentKeys = token.iterator();
+                }
+
+                while (currentKeys.hasNext())
+                {
+                    DecoratedKey key = currentKeys.next();
+
+                    if (!keyRange.right.isMinimum() && keyRange.right.compareTo(key) < 0)
+                        return endOfData();
+
+                    try (UnfilteredRowIterator partition = controller.getPartition(key, executionController))
+                    {
+                        List<Unfiltered> clusters = new ArrayList<>();
+                        while (partition.hasNext())
+                        {
+                            Unfiltered row = partition.next();
+                            if (operationTree.satisfiedBy(row, true))
+                                clusters.add(row);
+                        }
+
+                        if (!clusters.isEmpty())
+                            return new PartitionIterator(partition, clusters);
+                    }
+                }
+            }
+        }
+
+        private static class PartitionIterator extends AbstractUnfilteredRowIterator
+        {
+            private final Iterator<Unfiltered> rows;
+
+            public PartitionIterator(UnfilteredRowIterator partition, Collection<Unfiltered> content)
+            {
+                super(partition.metadata(),
+                      partition.partitionKey(),
+                      partition.partitionLevelDeletion(),
+                      partition.columns(),
+                      partition.staticRow(),
+                      partition.isReverseOrder(),
+                      partition.stats());
+
+                rows = content.iterator();
+            }
+
+            @Override
+            protected Unfiltered computeNext()
+            {
+                return rows.hasNext() ? rows.next() : endOfData();
+            }
+        }
+
+        public boolean isForThrift()
+        {
+            return controller.isForThrift();
+        }
+
+        public CFMetaData metadata()
+        {
+            return controller.metadata();
+        }
+
+        public void close()
+        {
+            FileUtils.closeQuietly(operationTree);
+            controller.finish();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/sa/ByteTerm.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/sa/ByteTerm.java b/src/java/org/apache/cassandra/index/sasi/sa/ByteTerm.java
new file mode 100644
index 0000000..c7bbab7
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/sa/ByteTerm.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.sa;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.index.sasi.disk.TokenTreeBuilder;
+import org.apache.cassandra.db.marshal.AbstractType;
+
+public class ByteTerm extends Term<ByteBuffer>
+{
+    public ByteTerm(int position, ByteBuffer value, TokenTreeBuilder tokens)
+    {
+        super(position, value, tokens);
+    }
+
+    public ByteBuffer getTerm()
+    {
+        return value.duplicate();
+    }
+
+    public ByteBuffer getSuffix(int start)
+    {
+        return (ByteBuffer) value.duplicate().position(value.position() + start);
+    }
+
+    public int compareTo(AbstractType<?> comparator, Term other)
+    {
+        return comparator.compare(value, (ByteBuffer) other.value);
+    }
+
+    public int length()
+    {
+        return value.remaining();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/sa/CharTerm.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/sa/CharTerm.java b/src/java/org/apache/cassandra/index/sasi/sa/CharTerm.java
new file mode 100644
index 0000000..533b566
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/sa/CharTerm.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.sa;
+
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+
+import org.apache.cassandra.index.sasi.disk.TokenTreeBuilder;
+import org.apache.cassandra.db.marshal.AbstractType;
+
+import com.google.common.base.Charsets;
+
+public class CharTerm extends Term<CharBuffer>
+{
+    public CharTerm(int position, CharBuffer value, TokenTreeBuilder tokens)
+    {
+        super(position, value, tokens);
+    }
+
+    public ByteBuffer getTerm()
+    {
+        return Charsets.UTF_8.encode(value.duplicate());
+    }
+
+    public ByteBuffer getSuffix(int start)
+    {
+        return Charsets.UTF_8.encode(value.subSequence(value.position() + start, value.remaining()));
+    }
+
+    public int compareTo(AbstractType<?> comparator, Term other)
+    {
+        return value.compareTo((CharBuffer) other.value);
+    }
+
+    public int length()
+    {
+        return value.length();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/sa/IntegralSA.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/sa/IntegralSA.java b/src/java/org/apache/cassandra/index/sasi/sa/IntegralSA.java
new file mode 100644
index 0000000..8356585
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/sa/IntegralSA.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.sa;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+
+import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder;
+import org.apache.cassandra.index.sasi.disk.TokenTreeBuilder;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.utils.Pair;
+
+public class IntegralSA extends SA<ByteBuffer>
+{
+    public IntegralSA(AbstractType<?> comparator, OnDiskIndexBuilder.Mode mode)
+    {
+        super(comparator, mode);
+    }
+
+    public Term<ByteBuffer> getTerm(ByteBuffer termValue, TokenTreeBuilder tokens)
+    {
+        return new ByteTerm(charCount, termValue, tokens);
+    }
+
+    public TermIterator finish()
+    {
+        return new IntegralSuffixIterator();
+    }
+
+
+    private class IntegralSuffixIterator extends TermIterator
+    {
+        private final Iterator<Term<ByteBuffer>> termIterator;
+
+        public IntegralSuffixIterator()
+        {
+            Collections.sort(terms, new Comparator<Term<?>>()
+            {
+                public int compare(Term<?> a, Term<?> b)
+                {
+                    return a.compareTo(comparator, b);
+                }
+            });
+
+            termIterator = terms.iterator();
+        }
+
+        public ByteBuffer minTerm()
+        {
+            return terms.get(0).getTerm();
+        }
+
+        public ByteBuffer maxTerm()
+        {
+            return terms.get(terms.size() - 1).getTerm();
+        }
+
+        protected Pair<ByteBuffer, TokenTreeBuilder> computeNext()
+        {
+            if (!termIterator.hasNext())
+                return endOfData();
+
+            Term<ByteBuffer> term = termIterator.next();
+            return Pair.create(term.getTerm(), term.getTokens().finish());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/sa/SA.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/sa/SA.java b/src/java/org/apache/cassandra/index/sasi/sa/SA.java
new file mode 100644
index 0000000..75f9f92
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/sa/SA.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.sa;
+
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder.Mode;
+import org.apache.cassandra.index.sasi.disk.TokenTreeBuilder;
+import org.apache.cassandra.db.marshal.AbstractType;
+
+public abstract class SA<T extends Buffer>
+{
+    protected final AbstractType<?> comparator;
+    protected final Mode mode;
+
+    protected final List<Term<T>> terms = new ArrayList<>();
+    protected int charCount = 0;
+
+    public SA(AbstractType<?> comparator, Mode mode)
+    {
+        this.comparator = comparator;
+        this.mode = mode;
+    }
+
+    public Mode getMode()
+    {
+        return mode;
+    }
+
+    public void add(ByteBuffer termValue, TokenTreeBuilder tokens)
+    {
+        Term<T> term = getTerm(termValue, tokens);
+        terms.add(term);
+        charCount += term.length();
+    }
+
+    public abstract TermIterator finish();
+
+    protected abstract Term<T> getTerm(ByteBuffer termValue, TokenTreeBuilder tokens);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/sa/SuffixSA.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/sa/SuffixSA.java b/src/java/org/apache/cassandra/index/sasi/sa/SuffixSA.java
new file mode 100644
index 0000000..63f6c5b
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/sa/SuffixSA.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.sa;
+
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+
+import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder;
+import org.apache.cassandra.index.sasi.disk.TokenTreeBuilder;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.utils.Pair;
+
+import com.google.common.base.Charsets;
+import net.mintern.primitive.Primitive;
+
+public class SuffixSA extends SA<CharBuffer>
+{
+    public SuffixSA(AbstractType<?> comparator, OnDiskIndexBuilder.Mode mode)
+    {
+        super(comparator, mode);
+    }
+
+    protected Term<CharBuffer> getTerm(ByteBuffer termValue, TokenTreeBuilder tokens)
+    {
+        return new CharTerm(charCount, Charsets.UTF_8.decode(termValue.duplicate()), tokens);
+    }
+
+    public TermIterator finish()
+    {
+        return new SASuffixIterator();
+    }
+
+    private class SASuffixIterator extends TermIterator
+    {
+        private final long[] suffixes;
+
+        private int current = 0;
+        private ByteBuffer lastProcessedSuffix;
+        private TokenTreeBuilder container;
+
+        public SASuffixIterator()
+        {
+            // each element has term index and char position encoded as two 32-bit integers
+            // to avoid binary search per suffix while sorting suffix array.
+            suffixes = new long[charCount];
+
+            long termIndex = -1, currentTermLength = -1;
+            for (int i = 0; i < charCount; i++)
+            {
+                if (i >= currentTermLength || currentTermLength == -1)
+                {
+                    Term currentTerm = terms.get((int) ++termIndex);
+                    currentTermLength = currentTerm.getPosition() + currentTerm.length();
+                }
+
+                suffixes[i] = (termIndex << 32) | i;
+            }
+
+            Primitive.sort(suffixes, (a, b) -> {
+                Term aTerm = terms.get((int) (a >>> 32));
+                Term bTerm = terms.get((int) (b >>> 32));
+                return comparator.compare(aTerm.getSuffix(((int) a) - aTerm.getPosition()),
+                                          bTerm.getSuffix(((int) b) - bTerm.getPosition()));
+            });
+        }
+
+        private Pair<ByteBuffer, TokenTreeBuilder> suffixAt(int position)
+        {
+            long index = suffixes[position];
+            Term term = terms.get((int) (index >>> 32));
+            return Pair.create(term.getSuffix(((int) index) - term.getPosition()), term.getTokens());
+        }
+
+        public ByteBuffer minTerm()
+        {
+            return suffixAt(0).left;
+        }
+
+        public ByteBuffer maxTerm()
+        {
+            return suffixAt(suffixes.length - 1).left;
+        }
+
+        protected Pair<ByteBuffer, TokenTreeBuilder> computeNext()
+        {
+            while (true)
+            {
+                if (current >= suffixes.length)
+                {
+                    if (lastProcessedSuffix == null)
+                        return endOfData();
+
+                    Pair<ByteBuffer, TokenTreeBuilder> result = finishSuffix();
+
+                    lastProcessedSuffix = null;
+                    return result;
+                }
+
+                Pair<ByteBuffer, TokenTreeBuilder> suffix = suffixAt(current++);
+
+                if (lastProcessedSuffix == null)
+                {
+                    lastProcessedSuffix = suffix.left;
+                    container = new TokenTreeBuilder(suffix.right.getTokens());
+                }
+                else if (comparator.compare(lastProcessedSuffix, suffix.left) == 0)
+                {
+                    lastProcessedSuffix = suffix.left;
+                    container.add(suffix.right.getTokens());
+                }
+                else
+                {
+                    Pair<ByteBuffer, TokenTreeBuilder> result = finishSuffix();
+
+                    lastProcessedSuffix = suffix.left;
+                    container = new TokenTreeBuilder(suffix.right.getTokens());
+
+                    return result;
+                }
+            }
+        }
+
+        private Pair<ByteBuffer, TokenTreeBuilder> finishSuffix()
+        {
+            return Pair.create(lastProcessedSuffix, container.finish());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/sa/Term.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/sa/Term.java b/src/java/org/apache/cassandra/index/sasi/sa/Term.java
new file mode 100644
index 0000000..fe6eca8
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/sa/Term.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.sa;
+
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.index.sasi.disk.TokenTreeBuilder;
+import org.apache.cassandra.db.marshal.AbstractType;
+
+public abstract class Term<T extends Buffer>
+{
+    protected final int position;
+    protected final T value;
+    protected TokenTreeBuilder tokens;
+
+
+    public Term(int position, T value, TokenTreeBuilder tokens)
+    {
+        this.position = position;
+        this.value = value;
+        this.tokens = tokens;
+    }
+
+    public int getPosition()
+    {
+        return position;
+    }
+
+    public abstract ByteBuffer getTerm();
+    public abstract ByteBuffer getSuffix(int start);
+
+    public TokenTreeBuilder getTokens()
+    {
+        return tokens;
+    }
+
+    public abstract int compareTo(AbstractType<?> comparator, Term other);
+
+    public abstract int length();
+
+}
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/sa/TermIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/sa/TermIterator.java b/src/java/org/apache/cassandra/index/sasi/sa/TermIterator.java
new file mode 100644
index 0000000..916aa07
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/sa/TermIterator.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.sa;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.index.sasi.disk.TokenTreeBuilder;
+import org.apache.cassandra.utils.Pair;
+
+import com.google.common.collect.AbstractIterator;
+
+public abstract class TermIterator extends AbstractIterator<Pair<ByteBuffer, TokenTreeBuilder>>
+{
+    public abstract ByteBuffer minTerm();
+    public abstract ByteBuffer maxTerm();
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/utils/AbstractIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/utils/AbstractIterator.java b/src/java/org/apache/cassandra/index/sasi/utils/AbstractIterator.java
new file mode 100644
index 0000000..cf918c1
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/utils/AbstractIterator.java
@@ -0,0 +1,155 @@
+/*
+ * Copyright (C) 2007 The Guava Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.index.sasi.utils;
+
+import java.util.NoSuchElementException;
+
+import com.google.common.collect.PeekingIterator;
+
+import static com.google.common.base.Preconditions.checkState;
+
+// This is fork of the Guava AbstractIterator, the only difference
+// is that state & next variables are now protected, this was required
+// for SkippableIterator.skipTo(..) to void all previous state.
+public abstract class AbstractIterator<T> implements PeekingIterator<T>
+{
+    protected State state = State.NOT_READY;
+
+    /** Constructor for use by subclasses. */
+    protected AbstractIterator() {}
+
+    protected enum State
+    {
+        /** We have computed the next element and haven't returned it yet. */
+        READY,
+
+        /** We haven't yet computed or have already returned the element. */
+        NOT_READY,
+
+        /** We have reached the end of the data and are finished. */
+        DONE,
+
+        /** We've suffered an exception and are kaput. */
+        FAILED,
+    }
+
+    protected T next;
+
+    /**
+     * Returns the next element. <b>Note:</b> the implementation must call {@link
+     * #endOfData()} when there are no elements left in the iteration. Failure to
+     * do so could result in an infinite loop.
+     *
+     * <p>The initial invocation of {@link #hasNext()} or {@link #next()} calls
+     * this method, as does the first invocation of {@code hasNext} or {@code
+     * next} following each successful call to {@code next}. Once the
+     * implementation either invokes {@code endOfData} or throws an exception,
+     * {@code computeNext} is guaranteed to never be called again.
+     *
+     * <p>If this method throws an exception, it will propagate outward to the
+     * {@code hasNext} or {@code next} invocation that invoked this method. Any
+     * further attempts to use the iterator will result in an {@link
+     * IllegalStateException}.
+     *
+     * <p>The implementation of this method may not invoke the {@code hasNext},
+     * {@code next}, or {@link #peek()} methods on this instance; if it does, an
+     * {@code IllegalStateException} will result.
+     *
+     * @return the next element if there was one. If {@code endOfData} was called
+     *     during execution, the return value will be ignored.
+     * @throws RuntimeException if any unrecoverable error happens. This exception
+     *     will propagate outward to the {@code hasNext()}, {@code next()}, or
+     *     {@code peek()} invocation that invoked this method. Any further
+     *     attempts to use the iterator will result in an
+     *     {@link IllegalStateException}.
+     */
+    protected abstract T computeNext();
+
+    /**
+     * Implementations of {@link #computeNext} <b>must</b> invoke this method when
+     * there are no elements left in the iteration.
+     *
+     * @return {@code null}; a convenience so your {@code computeNext}
+     *     implementation can use the simple statement {@code return endOfData();}
+     */
+    protected final T endOfData()
+    {
+        state = State.DONE;
+        return null;
+    }
+
+    public final boolean hasNext()
+    {
+        checkState(state != State.FAILED);
+
+        switch (state)
+        {
+            case DONE:
+                return false;
+
+            case READY:
+                return true;
+
+            default:
+        }
+
+        return tryToComputeNext();
+    }
+
+    protected boolean tryToComputeNext()
+    {
+        state = State.FAILED; // temporary pessimism
+        next = computeNext();
+
+        if (state != State.DONE)
+        {
+            state = State.READY;
+            return true;
+        }
+
+        return false;
+    }
+
+    public final T next()
+    {
+        if (!hasNext())
+            throw new NoSuchElementException();
+
+        state = State.NOT_READY;
+        return next;
+    }
+
+    public void remove()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Returns the next element in the iteration without advancing the iteration,
+     * according to the contract of {@link PeekingIterator#peek()}.
+     *
+     * <p>Implementations of {@code AbstractIterator} that wish to expose this
+     * functionality should implement {@code PeekingIterator}.
+     */
+    public final T peek()
+    {
+        if (!hasNext())
+            throw new NoSuchElementException();
+
+        return next;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/utils/CombinedTerm.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/utils/CombinedTerm.java b/src/java/org/apache/cassandra/index/sasi/utils/CombinedTerm.java
new file mode 100644
index 0000000..2bf5a07
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/utils/CombinedTerm.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.utils;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.cassandra.index.sasi.disk.OnDiskIndex.DataTerm;
+import org.apache.cassandra.index.sasi.disk.Token;
+import org.apache.cassandra.index.sasi.disk.TokenTree;
+import org.apache.cassandra.index.sasi.disk.TokenTreeBuilder;
+import org.apache.cassandra.db.marshal.AbstractType;
+
+import com.carrotsearch.hppc.LongOpenHashSet;
+import com.carrotsearch.hppc.LongSet;
+import com.carrotsearch.hppc.cursors.LongCursor;
+
+public class CombinedTerm implements CombinedValue<DataTerm>
+{
+    private final AbstractType<?> comparator;
+    private final DataTerm term;
+    private final TreeMap<Long, LongSet> tokens;
+
+    public CombinedTerm(AbstractType<?> comparator, DataTerm term)
+    {
+        this.comparator = comparator;
+        this.term = term;
+        this.tokens = new TreeMap<>();
+
+        RangeIterator<Long, Token> tokens = term.getTokens();
+        while (tokens.hasNext())
+        {
+            Token current = tokens.next();
+            LongSet offsets = this.tokens.get(current.get());
+            if (offsets == null)
+                this.tokens.put(current.get(), (offsets = new LongOpenHashSet()));
+
+            for (Long offset : ((TokenTree.OnDiskToken) current).getOffsets())
+                offsets.add(offset);
+        }
+    }
+
+    public ByteBuffer getTerm()
+    {
+        return term.getTerm();
+    }
+
+    public Map<Long, LongSet> getTokens()
+    {
+        return tokens;
+    }
+
+    public TokenTreeBuilder getTokenTreeBuilder()
+    {
+        return new TokenTreeBuilder(tokens).finish();
+    }
+
+    public void merge(CombinedValue<DataTerm> other)
+    {
+        if (!(other instanceof CombinedTerm))
+            return;
+
+        CombinedTerm o = (CombinedTerm) other;
+
+        assert comparator == o.comparator;
+
+        for (Map.Entry<Long, LongSet> token : o.tokens.entrySet())
+        {
+            LongSet offsets = this.tokens.get(token.getKey());
+            if (offsets == null)
+                this.tokens.put(token.getKey(), (offsets = new LongOpenHashSet()));
+
+            for (LongCursor offset : token.getValue())
+                offsets.add(offset.value);
+        }
+    }
+
+    public DataTerm get()
+    {
+        return term;
+    }
+
+    public int compareTo(CombinedValue<DataTerm> o)
+    {
+        return term.compareTo(comparator, o.get().getTerm());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/utils/CombinedTermIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/utils/CombinedTermIterator.java b/src/java/org/apache/cassandra/index/sasi/utils/CombinedTermIterator.java
new file mode 100644
index 0000000..06c27bf
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/utils/CombinedTermIterator.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.utils;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.index.sasi.disk.Descriptor;
+import org.apache.cassandra.index.sasi.disk.OnDiskIndex;
+import org.apache.cassandra.index.sasi.disk.TokenTreeBuilder;
+import org.apache.cassandra.index.sasi.sa.TermIterator;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.utils.Pair;
+
+public class CombinedTermIterator extends TermIterator
+{
+    final Descriptor descriptor;
+    final RangeIterator<OnDiskIndex.DataTerm, CombinedTerm> union;
+    final ByteBuffer min;
+    final ByteBuffer max;
+
+    public CombinedTermIterator(OnDiskIndex... sas)
+    {
+        this(Descriptor.CURRENT, sas);
+    }
+
+    public CombinedTermIterator(Descriptor d, OnDiskIndex... parts)
+    {
+        descriptor = d;
+        union = OnDiskIndexIterator.union(parts);
+
+        AbstractType<?> comparator = parts[0].getComparator(); // assumes all SAs have same comparator
+        ByteBuffer minimum = parts[0].minTerm();
+        ByteBuffer maximum = parts[0].maxTerm();
+
+        for (int i = 1; i < parts.length; i++)
+        {
+            OnDiskIndex part = parts[i];
+            if (part == null)
+                continue;
+
+            minimum = comparator.compare(minimum, part.minTerm()) > 0 ? part.minTerm() : minimum;
+            maximum = comparator.compare(maximum, part.maxTerm()) < 0 ? part.maxTerm() : maximum;
+        }
+
+        min = minimum;
+        max = maximum;
+    }
+
+    public ByteBuffer minTerm()
+    {
+        return min;
+    }
+
+    public ByteBuffer maxTerm()
+    {
+        return max;
+    }
+
+    protected Pair<ByteBuffer, TokenTreeBuilder> computeNext()
+    {
+        if (!union.hasNext())
+        {
+            return endOfData();
+        }
+        else
+        {
+            CombinedTerm term = union.next();
+            return Pair.create(term.getTerm(), term.getTokenTreeBuilder());
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/utils/CombinedValue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/utils/CombinedValue.java b/src/java/org/apache/cassandra/index/sasi/utils/CombinedValue.java
new file mode 100644
index 0000000..ca5f9be
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/utils/CombinedValue.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.utils;
+
+public interface CombinedValue<V> extends Comparable<CombinedValue<V>>
+{
+    void merge(CombinedValue<V> other);
+
+    V get();
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/utils/MappedBuffer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/utils/MappedBuffer.java b/src/java/org/apache/cassandra/index/sasi/utils/MappedBuffer.java
new file mode 100644
index 0000000..37ab1be
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/utils/MappedBuffer.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.utils;
+
+import java.io.Closeable;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel.MapMode;
+
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.io.util.ChannelProxy;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.RandomAccessReader;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class MappedBuffer implements Closeable
+{
+    private final MappedByteBuffer[] pages;
+
+    private long position, limit;
+    private final long capacity;
+    private final int pageSize, sizeBits;
+
+    private MappedBuffer(MappedBuffer other)
+    {
+        this.sizeBits = other.sizeBits;
+        this.pageSize = other.pageSize;
+        this.position = other.position;
+        this.limit = other.limit;
+        this.capacity = other.capacity;
+        this.pages = other.pages;
+    }
+
+    public MappedBuffer(RandomAccessReader file)
+    {
+        this(file.getChannel(), 30);
+    }
+
+    public MappedBuffer(ChannelProxy file)
+    {
+        this(file, 30);
+    }
+
+    @VisibleForTesting
+    protected MappedBuffer(ChannelProxy file, int numPageBits)
+    {
+        if (numPageBits > Integer.SIZE - 1)
+            throw new IllegalArgumentException("page size can't be bigger than 1G");
+
+        sizeBits = numPageBits;
+        pageSize = 1 << sizeBits;
+        position = 0;
+        limit = capacity = file.size();
+        pages = new MappedByteBuffer[(int) (file.size() / pageSize) + 1];
+
+        try
+        {
+            long offset = 0;
+            for (int i = 0; i < pages.length; i++)
+            {
+                long pageSize = Math.min(this.pageSize, (capacity - offset));
+                pages[i] = file.map(MapMode.READ_ONLY, offset, pageSize);
+                offset += pageSize;
+            }
+        }
+        finally
+        {
+            file.close();
+        }
+    }
+
+    public int comparePageTo(long offset, int length, AbstractType<?> comparator, ByteBuffer other)
+    {
+        return comparator.compare(getPageRegion(offset, length), other);
+    }
+
+    public long capacity()
+    {
+        return capacity;
+    }
+
+    public long position()
+    {
+        return position;
+    }
+
+    public MappedBuffer position(long newPosition)
+    {
+        if (newPosition < 0 || newPosition > limit)
+            throw new IllegalArgumentException("position: " + newPosition + ", limit: " + limit);
+
+        position = newPosition;
+        return this;
+    }
+
+    public long limit()
+    {
+        return limit;
+    }
+
+    public MappedBuffer limit(long newLimit)
+    {
+        if (newLimit < position || newLimit > capacity)
+            throw new IllegalArgumentException();
+
+        limit = newLimit;
+        return this;
+    }
+
+    public long remaining()
+    {
+        return limit - position;
+    }
+
+    public boolean hasRemaining()
+    {
+        return remaining() > 0;
+    }
+
+    public byte get()
+    {
+        return get(position++);
+    }
+
+    public byte get(long pos)
+    {
+        return pages[getPage(pos)].get(getPageOffset(pos));
+    }
+
+    public short getShort()
+    {
+        short value = getShort(position);
+        position += 2;
+        return value;
+    }
+
+    public short getShort(long pos)
+    {
+        if (isPageAligned(pos, 2))
+            return pages[getPage(pos)].getShort(getPageOffset(pos));
+
+        int ch1 = get(pos)     & 0xff;
+        int ch2 = get(pos + 1) & 0xff;
+        return (short) ((ch1 << 8) + ch2);
+    }
+
+    public int getInt()
+    {
+        int value = getInt(position);
+        position += 4;
+        return value;
+    }
+
+    public int getInt(long pos)
+    {
+        if (isPageAligned(pos, 4))
+            return pages[getPage(pos)].getInt(getPageOffset(pos));
+
+        int ch1 = get(pos)     & 0xff;
+        int ch2 = get(pos + 1) & 0xff;
+        int ch3 = get(pos + 2) & 0xff;
+        int ch4 = get(pos + 3) & 0xff;
+
+        return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + ch4);
+    }
+
+    public long getLong()
+    {
+        long value = getLong(position);
+        position += 8;
+        return value;
+    }
+
+
+    public long getLong(long pos)
+    {
+        // fast path if the long could be retrieved from a single page
+        // that would avoid multiple expensive look-ups into page array.
+        return (isPageAligned(pos, 8))
+                ? pages[getPage(pos)].getLong(getPageOffset(pos))
+                : ((long) (getInt(pos)) << 32) + (getInt(pos + 4) & 0xFFFFFFFFL);
+    }
+
+    public ByteBuffer getPageRegion(long position, int length)
+    {
+        if (!isPageAligned(position, length))
+            throw new IllegalArgumentException(String.format("range: %s-%s wraps more than one page", position, length));
+
+        ByteBuffer slice = pages[getPage(position)].duplicate();
+
+        int pageOffset = getPageOffset(position);
+        slice.position(pageOffset).limit(pageOffset + length);
+
+        return slice;
+    }
+
+    public MappedBuffer duplicate()
+    {
+        return new MappedBuffer(this);
+    }
+
+    public void close()
+    {
+        if (!FileUtils.isCleanerAvailable())
+            return;
+
+        /*
+         * Try forcing the unmapping of pages using undocumented unsafe sun APIs.
+         * If this fails (non Sun JVM), we'll have to wait for the GC to finalize the mapping.
+         * If this works and a thread tries to access any page, hell will unleash on earth.
+         */
+        try
+        {
+            for (MappedByteBuffer segment : pages)
+                FileUtils.clean(segment);
+        }
+        catch (Exception e)
+        {
+            // This is not supposed to happen
+        }
+    }
+
+    private int getPage(long position)
+    {
+        return (int) (position >> sizeBits);
+    }
+
+    private int getPageOffset(long position)
+    {
+        return (int) (position & pageSize - 1);
+    }
+
+    private boolean isPageAligned(long position, int length)
+    {
+        return pageSize - (getPageOffset(position) + length) > 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/utils/OnDiskIndexIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/utils/OnDiskIndexIterator.java b/src/java/org/apache/cassandra/index/sasi/utils/OnDiskIndexIterator.java
new file mode 100644
index 0000000..ae97cab
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/utils/OnDiskIndexIterator.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.utils;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.cassandra.index.sasi.disk.OnDiskIndex;
+import org.apache.cassandra.index.sasi.disk.OnDiskIndex.DataTerm;
+import org.apache.cassandra.db.marshal.AbstractType;
+
+public class OnDiskIndexIterator extends RangeIterator<DataTerm, CombinedTerm>
+{
+    private final AbstractType<?> comparator;
+    private final Iterator<DataTerm> terms;
+
+    public OnDiskIndexIterator(OnDiskIndex index)
+    {
+        super(index.min(), index.max(), Long.MAX_VALUE);
+
+        this.comparator = index.getComparator();
+        this.terms = index.iterator();
+    }
+
+    public static RangeIterator<DataTerm, CombinedTerm> union(OnDiskIndex... union)
+    {
+        RangeUnionIterator.Builder<DataTerm, CombinedTerm> builder = RangeUnionIterator.builder();
+        for (OnDiskIndex e : union)
+        {
+            if (e != null)
+                builder.add(new OnDiskIndexIterator(e));
+        }
+
+        return builder.build();
+    }
+
+    protected CombinedTerm computeNext()
+    {
+        return terms.hasNext() ? new CombinedTerm(comparator, terms.next()) : endOfData();
+    }
+
+    protected void performSkipTo(DataTerm nextToken)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public void close() throws IOException
+    {}
+}