You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by xe...@apache.org on 2016/01/24 04:36:13 UTC

[11/14] cassandra git commit: Integrate SASI index into Cassandra

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/conf/view/PrefixTermTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/conf/view/PrefixTermTree.java b/src/java/org/apache/cassandra/index/sasi/conf/view/PrefixTermTree.java
new file mode 100644
index 0000000..72b6daf
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/conf/view/PrefixTermTree.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.conf.view;
+
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.cassandra.index.sasi.SSTableIndex;
+import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder;
+import org.apache.cassandra.index.sasi.plan.Expression;
+import org.apache.cassandra.index.sasi.utils.trie.KeyAnalyzer;
+import org.apache.cassandra.index.sasi.utils.trie.PatriciaTrie;
+import org.apache.cassandra.index.sasi.utils.trie.Trie;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.utils.Interval;
+import org.apache.cassandra.utils.IntervalTree;
+
+import com.google.common.collect.Sets;
+
+/**
+ * This class is an extension over RangeTermTree for string terms,
+ * it is required because interval tree can't handle matching if search is on the
+ * prefix of min/max of the range, so for ascii/utf8 fields we build an additional
+ * prefix trie (including both min/max terms of the index) and do union of the results
+ * of the prefix tree search and results from the interval tree lookup.
+ */
+public class PrefixTermTree extends RangeTermTree
+{
+    private final OnDiskIndexBuilder.Mode mode;
+    private final Trie<ByteBuffer, Set<SSTableIndex>> trie;
+
+    public PrefixTermTree(ByteBuffer min, ByteBuffer max,
+                          Trie<ByteBuffer, Set<SSTableIndex>> trie,
+                          IntervalTree<ByteBuffer, SSTableIndex, Interval<ByteBuffer, SSTableIndex>> ranges,
+                          OnDiskIndexBuilder.Mode mode)
+    {
+        super(min, max, ranges);
+
+        this.mode = mode;
+        this.trie = trie;
+    }
+
+    public Set<SSTableIndex> search(Expression e)
+    {
+        Map<ByteBuffer, Set<SSTableIndex>> indexes = (e == null || e.lower == null || mode == OnDiskIndexBuilder.Mode.CONTAINS)
+                                                        ? trie : trie.prefixMap(e.lower.value);
+
+        Set<SSTableIndex> view = new HashSet<>(indexes.size());
+        indexes.values().forEach(view::addAll);
+
+        return Sets.union(view, super.search(e));
+    }
+
+    public static class Builder extends RangeTermTree.Builder
+    {
+        private final PatriciaTrie<ByteBuffer, Set<SSTableIndex>> trie;
+
+        protected Builder(OnDiskIndexBuilder.Mode mode, final AbstractType<?> comparator)
+        {
+            super(mode, comparator);
+            trie = new PatriciaTrie<>(new ByteBufferKeyAnalyzer(comparator));
+        }
+
+        public void addIndex(SSTableIndex index)
+        {
+            super.addIndex(index);
+            addTerm(index.minTerm(), index);
+            addTerm(index.maxTerm(), index);
+        }
+
+        public TermTree build()
+        {
+            return new PrefixTermTree(min, max, trie, IntervalTree.build(intervals), mode);
+        }
+
+        private void addTerm(ByteBuffer term, SSTableIndex index)
+        {
+            Set<SSTableIndex> indexes = trie.get(term);
+            if (indexes == null)
+                trie.put(term, (indexes = new HashSet<>()));
+
+            indexes.add(index);
+        }
+    }
+
+    private static class ByteBufferKeyAnalyzer implements KeyAnalyzer<ByteBuffer>
+    {
+        private final AbstractType<?> comparator;
+
+        public ByteBufferKeyAnalyzer(AbstractType<?> comparator)
+        {
+            this.comparator = comparator;
+        }
+
+        /**
+         * A bit mask where the first bit is 1 and the others are zero
+         */
+        private static final int MSB = 1 << Byte.SIZE-1;
+
+        public int compare(ByteBuffer a, ByteBuffer b)
+        {
+            return comparator.compare(a, b);
+        }
+
+        public int lengthInBits(ByteBuffer o)
+        {
+            return o.remaining() * Byte.SIZE;
+        }
+
+        public boolean isBitSet(ByteBuffer key, int bitIndex)
+        {
+            if (bitIndex >= lengthInBits(key))
+                return false;
+
+            int index = bitIndex / Byte.SIZE;
+            int bit = bitIndex % Byte.SIZE;
+            return (key.get(index) & mask(bit)) != 0;
+        }
+
+        public int bitIndex(ByteBuffer key, ByteBuffer otherKey)
+        {
+            int length = Math.max(key.remaining(), otherKey.remaining());
+
+            boolean allNull = true;
+            for (int i = 0; i < length; i++)
+            {
+                byte b1 = valueAt(key, i);
+                byte b2 = valueAt(otherKey, i);
+
+                if (b1 != b2)
+                {
+                    int xor = b1 ^ b2;
+                    for (int j = 0; j < Byte.SIZE; j++)
+                    {
+                        if ((xor & mask(j)) != 0)
+                            return (i * Byte.SIZE) + j;
+                    }
+                }
+
+                if (b1 != 0)
+                    allNull = false;
+            }
+
+            return allNull ? KeyAnalyzer.NULL_BIT_KEY : KeyAnalyzer.EQUAL_BIT_KEY;
+        }
+
+        public boolean isPrefix(ByteBuffer key, ByteBuffer prefix)
+        {
+            if (key.remaining() < prefix.remaining())
+                return false;
+
+            for (int i = 0; i < prefix.remaining(); i++)
+            {
+                if (key.get(i) != prefix.get(i))
+                    return false;
+            }
+
+            return true;
+        }
+
+        /**
+         * Returns the {@code byte} value at the given index.
+         */
+        private byte valueAt(ByteBuffer value, int index)
+        {
+            return index >= 0 && index < value.remaining() ? value.get(index) : 0;
+        }
+
+        /**
+         * Returns a bit mask where the given bit is set
+         */
+        private int mask(int bit)
+        {
+            return MSB >>> bit;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/conf/view/RangeTermTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/conf/view/RangeTermTree.java b/src/java/org/apache/cassandra/index/sasi/conf/view/RangeTermTree.java
new file mode 100644
index 0000000..62e5636
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/conf/view/RangeTermTree.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.conf.view;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.cassandra.index.sasi.SSTableIndex;
+import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder;
+import org.apache.cassandra.index.sasi.plan.Expression;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.utils.Interval;
+import org.apache.cassandra.utils.IntervalTree;
+
+public class RangeTermTree implements TermTree
+{
+    protected final ByteBuffer min, max;
+    protected final IntervalTree<ByteBuffer, SSTableIndex, Interval<ByteBuffer, SSTableIndex>> rangeTree;
+
+    public RangeTermTree(ByteBuffer min, ByteBuffer max, IntervalTree<ByteBuffer, SSTableIndex, Interval<ByteBuffer, SSTableIndex>> rangeTree)
+    {
+        this.min = min;
+        this.max = max;
+        this.rangeTree = rangeTree;
+    }
+
+    public Set<SSTableIndex> search(Expression e)
+    {
+        ByteBuffer minTerm = e.lower == null ? min : e.lower.value;
+        ByteBuffer maxTerm = e.upper == null ? max : e.upper.value;
+
+        return new HashSet<>(rangeTree.search(Interval.create(minTerm, maxTerm, (SSTableIndex) null)));
+    }
+
+    public int intervalCount()
+    {
+        return rangeTree.intervalCount();
+    }
+
+    static class Builder extends TermTree.Builder
+    {
+        protected final List<Interval<ByteBuffer, SSTableIndex>> intervals = new ArrayList<>();
+
+        protected Builder(OnDiskIndexBuilder.Mode mode, AbstractType<?> comparator)
+        {
+            super(mode, comparator);
+        }
+
+        public void addIndex(SSTableIndex index)
+        {
+            intervals.add(Interval.create(index.minTerm(), index.maxTerm(), index));
+        }
+
+        public TermTree build()
+        {
+            return new RangeTermTree(min, max, IntervalTree.build(intervals));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/conf/view/TermTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/conf/view/TermTree.java b/src/java/org/apache/cassandra/index/sasi/conf/view/TermTree.java
new file mode 100644
index 0000000..a175e22
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/conf/view/TermTree.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.conf.view;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+import org.apache.cassandra.index.sasi.SSTableIndex;
+import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder;
+import org.apache.cassandra.index.sasi.plan.Expression;
+import org.apache.cassandra.db.marshal.AbstractType;
+
+public interface TermTree
+{
+    Set<SSTableIndex> search(Expression e);
+
+    int intervalCount();
+
+    abstract class Builder
+    {
+        protected final OnDiskIndexBuilder.Mode mode;
+        protected final AbstractType<?> comparator;
+        protected ByteBuffer min, max;
+
+        protected Builder(OnDiskIndexBuilder.Mode mode, AbstractType<?> comparator)
+        {
+            this.mode = mode;
+            this.comparator = comparator;
+        }
+
+        public final void add(SSTableIndex index)
+        {
+            addIndex(index);
+
+            min = min == null || comparator.compare(min, index.minTerm()) > 0 ? index.minTerm() : min;
+            max = max == null || comparator.compare(max, index.maxTerm()) < 0 ? index.maxTerm() : max;
+        }
+
+        protected abstract void addIndex(SSTableIndex index);
+
+        public abstract TermTree build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/conf/view/View.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/conf/view/View.java b/src/java/org/apache/cassandra/index/sasi/conf/view/View.java
new file mode 100644
index 0000000..378c3c6
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/conf/view/View.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.conf.view;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.index.sasi.SSTableIndex;
+import org.apache.cassandra.index.sasi.conf.ColumnIndex;
+import org.apache.cassandra.index.sasi.plan.Expression;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.AsciiType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.utils.Interval;
+import org.apache.cassandra.utils.IntervalTree;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+
+public class View implements Iterable<SSTableIndex>
+{
+    private final Map<Descriptor, SSTableIndex> view;
+
+    private final TermTree termTree;
+    private final IntervalTree<ByteBuffer, SSTableIndex, Interval<ByteBuffer, SSTableIndex>> keyIntervalTree;
+
+    public View(ColumnIndex index, Set<SSTableIndex> indexes)
+    {
+        this(index, Collections.<SSTableIndex>emptyList(), Collections.<SSTableReader>emptyList(), indexes);
+    }
+
+    public View(ColumnIndex index,
+                Collection<SSTableIndex> currentView,
+                Collection<SSTableReader> oldSSTables,
+                Set<SSTableIndex> newIndexes)
+    {
+        Map<Descriptor, SSTableIndex> newView = new HashMap<>();
+
+        AbstractType<?> validator = index.getValidator();
+        TermTree.Builder termTreeBuilder = (validator instanceof AsciiType || validator instanceof UTF8Type)
+                                            ? new PrefixTermTree.Builder(index.getMode().mode, validator)
+                                            : new RangeTermTree.Builder(index.getMode().mode, validator);
+
+        List<Interval<ByteBuffer, SSTableIndex>> keyIntervals = new ArrayList<>();
+        for (SSTableIndex sstableIndex : Iterables.concat(currentView, newIndexes))
+        {
+            SSTableReader sstable = sstableIndex.getSSTable();
+            if (oldSSTables.contains(sstable) || sstable.isMarkedCompacted() || newView.containsKey(sstable.descriptor))
+            {
+                sstableIndex.release();
+                continue;
+            }
+
+            newView.put(sstable.descriptor, sstableIndex);
+
+            termTreeBuilder.add(sstableIndex);
+            keyIntervals.add(Interval.create(sstableIndex.minKey(), sstableIndex.maxKey(), sstableIndex));
+        }
+
+        this.view = newView;
+        this.termTree = termTreeBuilder.build();
+        this.keyIntervalTree = IntervalTree.build(keyIntervals);
+
+        if (keyIntervalTree.intervalCount() != termTree.intervalCount())
+            throw new IllegalStateException(String.format("mismatched sizes for intervals tree for keys vs terms: %d != %d", keyIntervalTree.intervalCount(), termTree.intervalCount()));
+    }
+
+    public Set<SSTableIndex> match(final Set<SSTableReader> scope, Expression expression)
+    {
+        return Sets.filter(termTree.search(expression), index -> scope.contains(index.getSSTable()));
+    }
+
+    public List<SSTableIndex> match(ByteBuffer minKey, ByteBuffer maxKey)
+    {
+        return keyIntervalTree.search(Interval.create(minKey, maxKey, (SSTableIndex) null));
+    }
+
+    public Iterator<SSTableIndex> iterator()
+    {
+        return view.values().iterator();
+    }
+
+    public Collection<SSTableIndex> getIndexes()
+    {
+        return view.values();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/disk/Descriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/Descriptor.java b/src/java/org/apache/cassandra/index/sasi/disk/Descriptor.java
new file mode 100644
index 0000000..a719f50
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/disk/Descriptor.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.disk;
+
+/**
+ * Object descriptor for SSTableAttachedSecondaryIndex files. Similar to, and based upon, the sstable descriptor.
+ */
+public class Descriptor
+{
+    public static final String VERSION_AA = "aa";
+    public static final String VERSION_AB = "ab";
+    public static final String CURRENT_VERSION = VERSION_AB;
+    public static final Descriptor CURRENT = new Descriptor(CURRENT_VERSION);
+
+    public static class Version
+    {
+        public final String version;
+
+        public Version(String version)
+        {
+            this.version = version;
+        }
+
+        public String toString()
+        {
+            return version;
+        }
+    }
+
+    public final Version version;
+
+    public Descriptor(String v)
+    {
+        this.version = new Version(v);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/disk/OnDiskBlock.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/OnDiskBlock.java b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskBlock.java
new file mode 100644
index 0000000..32cda53
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskBlock.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.disk;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.index.sasi.Term;
+import org.apache.cassandra.index.sasi.utils.MappedBuffer;
+import org.apache.cassandra.db.marshal.AbstractType;
+
+public abstract class OnDiskBlock<T extends Term>
+{
+    public enum BlockType
+    {
+        POINTER, DATA
+    }
+
+    // this contains offsets of the terms and term data
+    protected final MappedBuffer blockIndex;
+    protected final int blockIndexSize;
+
+    protected final boolean hasCombinedIndex;
+    protected final TokenTree combinedIndex;
+
+    public OnDiskBlock(Descriptor descriptor, MappedBuffer block, BlockType blockType)
+    {
+        blockIndex = block;
+
+        if (blockType == BlockType.POINTER)
+        {
+            hasCombinedIndex = false;
+            combinedIndex = null;
+            blockIndexSize = block.getInt() << 1; // num terms * sizeof(short)
+            return;
+        }
+
+        long blockOffset = block.position();
+        int combinedIndexOffset = block.getInt(blockOffset + OnDiskIndexBuilder.BLOCK_SIZE);
+
+        hasCombinedIndex = (combinedIndexOffset >= 0);
+        long blockIndexOffset = blockOffset + OnDiskIndexBuilder.BLOCK_SIZE + 4 + combinedIndexOffset;
+
+        combinedIndex = hasCombinedIndex ? new TokenTree(descriptor, blockIndex.duplicate().position(blockIndexOffset)) : null;
+        blockIndexSize = block.getInt() * 2;
+    }
+
+    public SearchResult<T> search(AbstractType<?> comparator, ByteBuffer query)
+    {
+        int cmp = -1, start = 0, end = termCount() - 1, middle = 0;
+
+        T element = null;
+        while (start <= end)
+        {
+            middle = start + ((end - start) >> 1);
+            element = getTerm(middle);
+
+            cmp = element.compareTo(comparator, query);
+            if (cmp == 0)
+                return new SearchResult<>(element, cmp, middle);
+            else if (cmp < 0)
+                start = middle + 1;
+            else
+                end = middle - 1;
+        }
+
+        return new SearchResult<>(element, cmp, middle);
+    }
+
+    protected T getTerm(int index)
+    {
+        MappedBuffer dup = blockIndex.duplicate();
+        long startsAt = getTermPosition(index);
+        if (termCount() - 1 == index) // last element
+            dup.position(startsAt);
+        else
+            dup.position(startsAt).limit(getTermPosition(index + 1));
+
+        return cast(dup);
+    }
+
+    protected long getTermPosition(int idx)
+    {
+        return getTermPosition(blockIndex, idx, blockIndexSize);
+    }
+
+    protected int termCount()
+    {
+        return blockIndexSize >> 1;
+    }
+
+    protected abstract T cast(MappedBuffer data);
+
+    static long getTermPosition(MappedBuffer data, int idx, int indexSize)
+    {
+        idx <<= 1;
+        assert idx < indexSize;
+        return data.position() + indexSize + data.getShort(data.position() + idx);
+    }
+
+    public TokenTree getBlockIndex()
+    {
+        return combinedIndex;
+    }
+
+    public int minOffset(OnDiskIndex.IteratorOrder order)
+    {
+        return order == OnDiskIndex.IteratorOrder.DESC ? 0 : termCount() - 1;
+    }
+
+    public int maxOffset(OnDiskIndex.IteratorOrder order)
+    {
+        return minOffset(order) == 0 ? termCount() - 1 : 0;
+    }
+
+    public static class SearchResult<T>
+    {
+        public final T result;
+        public final int index, cmp;
+
+        public SearchResult(T result, int cmp, int index)
+        {
+            this.result = result;
+            this.index = index;
+            this.cmp = cmp;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndex.java b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndex.java
new file mode 100644
index 0000000..0f9e389
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndex.java
@@ -0,0 +1,773 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.disk;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.stream.Collectors;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.index.sasi.Term;
+import org.apache.cassandra.index.sasi.plan.Expression;
+import org.apache.cassandra.index.sasi.plan.Expression.Op;
+import org.apache.cassandra.index.sasi.utils.MappedBuffer;
+import org.apache.cassandra.index.sasi.utils.RangeUnionIterator;
+import org.apache.cassandra.index.sasi.utils.AbstractIterator;
+import org.apache.cassandra.index.sasi.utils.RangeIterator;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.util.ChannelProxy;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+
+import static org.apache.cassandra.index.sasi.disk.OnDiskBlock.SearchResult;
+
+public class OnDiskIndex implements Iterable<OnDiskIndex.DataTerm>, Closeable
+{
+    public enum IteratorOrder
+    {
+        DESC(1), ASC(-1);
+
+        public final int step;
+
+        IteratorOrder(int step)
+        {
+            this.step = step;
+        }
+
+        public int startAt(OnDiskBlock<DataTerm> block, Expression e)
+        {
+            switch (this)
+            {
+                case DESC:
+                    return e.lower == null
+                            ? 0
+                            : startAt(block.search(e.validator, e.lower.value), e.lower.inclusive);
+
+                case ASC:
+                    return e.upper == null
+                            ? block.termCount() - 1
+                            : startAt(block.search(e.validator, e.upper.value), e.upper.inclusive);
+
+                default:
+                    throw new IllegalArgumentException("Unknown order: " + this);
+            }
+        }
+
+        public int startAt(SearchResult<DataTerm> found, boolean inclusive)
+        {
+            switch (this)
+            {
+                case DESC:
+                    if (found.cmp < 0)
+                        return found.index + 1;
+
+                    return inclusive || found.cmp != 0 ? found.index : found.index + 1;
+
+                case ASC:
+                    if (found.cmp < 0) // search term was bigger then whole data set
+                        return found.index;
+                    return inclusive && (found.cmp == 0 || found.cmp < 0) ? found.index : found.index - 1;
+
+                default:
+                    throw new IllegalArgumentException("Unknown order: " + this);
+            }
+        }
+    }
+
+    public final Descriptor descriptor;
+    protected final OnDiskIndexBuilder.Mode mode;
+    protected final OnDiskIndexBuilder.TermSize termSize;
+
+    protected final AbstractType<?> comparator;
+    protected final MappedBuffer indexFile;
+    protected final long indexSize;
+
+    protected final Function<Long, DecoratedKey> keyFetcher;
+
+    protected final String indexPath;
+
+    protected final PointerLevel[] levels;
+    protected final DataLevel dataLevel;
+
+    protected final ByteBuffer minTerm, maxTerm, minKey, maxKey;
+
+    public OnDiskIndex(File index, AbstractType<?> cmp, Function<Long, DecoratedKey> keyReader)
+    {
+        keyFetcher = keyReader;
+
+        comparator = cmp;
+        indexPath = index.getAbsolutePath();
+
+        RandomAccessFile backingFile = null;
+        try
+        {
+            backingFile = new RandomAccessFile(index, "r");
+
+            descriptor = new Descriptor(backingFile.readUTF());
+
+            termSize = OnDiskIndexBuilder.TermSize.of(backingFile.readShort());
+
+            minTerm = ByteBufferUtil.readWithShortLength(backingFile);
+            maxTerm = ByteBufferUtil.readWithShortLength(backingFile);
+
+            minKey = ByteBufferUtil.readWithShortLength(backingFile);
+            maxKey = ByteBufferUtil.readWithShortLength(backingFile);
+
+            mode = OnDiskIndexBuilder.Mode.mode(backingFile.readUTF());
+
+            indexSize = backingFile.length();
+            indexFile = new MappedBuffer(new ChannelProxy(indexPath, backingFile.getChannel()));
+
+            // start of the levels
+            indexFile.position(indexFile.getLong(indexSize - 8));
+
+            int numLevels = indexFile.getInt();
+            levels = new PointerLevel[numLevels];
+            for (int i = 0; i < levels.length; i++)
+            {
+                int blockCount = indexFile.getInt();
+                levels[i] = new PointerLevel(indexFile.position(), blockCount);
+                indexFile.position(indexFile.position() + blockCount * 8);
+            }
+
+            int blockCount = indexFile.getInt();
+            dataLevel = new DataLevel(indexFile.position(), blockCount);
+        }
+        catch (IOException e)
+        {
+            throw new FSReadError(e, index);
+        }
+        finally
+        {
+            FileUtils.closeQuietly(backingFile);
+        }
+    }
+
+    public ByteBuffer minTerm()
+    {
+        return minTerm;
+    }
+
+    public ByteBuffer maxTerm()
+    {
+        return maxTerm;
+    }
+
+    public ByteBuffer minKey()
+    {
+        return minKey;
+    }
+
+    public ByteBuffer maxKey()
+    {
+        return maxKey;
+    }
+
+    public DataTerm min()
+    {
+        return dataLevel.getBlock(0).getTerm(0);
+    }
+
+    public DataTerm max()
+    {
+        DataBlock block = dataLevel.getBlock(dataLevel.blockCount - 1);
+        return block.getTerm(block.termCount() - 1);
+    }
+
+    /**
+     * Search for rows which match all of the terms inside the given expression in the index file.
+     *
+     * @param exp The expression to use for the query.
+     *
+     * @return Iterator which contains rows for all of the terms from the given range.
+     */
+    public RangeIterator<Long, Token> search(Expression exp)
+    {
+        // convert single NOT_EQ to range with exclusion
+        final Expression expression = (exp.getOp() != Op.NOT_EQ)
+                                        ? exp
+                                        : new Expression(exp).setOp(Op.RANGE)
+                                                .setLower(new Expression.Bound(minTerm, true))
+                                                .setUpper(new Expression.Bound(maxTerm, true))
+                                                .addExclusion(exp.lower.value);
+
+        List<ByteBuffer> exclusions = new ArrayList<>(expression.exclusions.size());
+
+        Iterables.addAll(exclusions, expression.exclusions.stream().filter(exclusion -> {
+            // accept only exclusions which are in the bounds of lower/upper
+            return !(expression.lower != null && comparator.compare(exclusion, expression.lower.value) < 0)
+                && !(expression.upper != null && comparator.compare(exclusion, expression.upper.value) > 0);
+        }).collect(Collectors.toList()));
+
+        Collections.sort(exclusions, comparator);
+
+        if (exclusions.size() == 0)
+            return searchRange(expression);
+
+        List<Expression> ranges = new ArrayList<>(exclusions.size());
+
+        // calculate range splits based on the sorted exclusions
+        Iterator<ByteBuffer> exclusionsIterator = exclusions.iterator();
+
+        Expression.Bound min = expression.lower, max = null;
+        while (exclusionsIterator.hasNext())
+        {
+            max = new Expression.Bound(exclusionsIterator.next(), false);
+            ranges.add(new Expression(expression).setOp(Op.RANGE).setLower(min).setUpper(max));
+            min = max;
+        }
+
+        assert max != null;
+        ranges.add(new Expression(expression).setOp(Op.RANGE).setLower(max).setUpper(expression.upper));
+
+        RangeUnionIterator.Builder<Long, Token> builder = RangeUnionIterator.builder();
+        for (Expression e : ranges)
+        {
+            RangeIterator<Long, Token> range = searchRange(e);
+            if (range != null)
+                builder.add(range);
+        }
+
+        return builder.build();
+    }
+
+    private RangeIterator<Long, Token> searchRange(Expression range)
+    {
+        Expression.Bound lower = range.lower;
+        Expression.Bound upper = range.upper;
+
+        int lowerBlock = lower == null ? 0 : getDataBlock(lower.value);
+        int upperBlock = upper == null
+                ? dataLevel.blockCount - 1
+                // optimization so we don't have to fetch upperBlock when query has lower == upper
+                : (lower != null && comparator.compare(lower.value, upper.value) == 0) ? lowerBlock : getDataBlock(upper.value);
+
+        return (mode != OnDiskIndexBuilder.Mode.SPARSE || lowerBlock == upperBlock || upperBlock - lowerBlock <= 1)
+                ? searchPoint(lowerBlock, range)
+                : searchRange(lowerBlock, lower, upperBlock, upper);
+    }
+
+    private RangeIterator<Long, Token> searchRange(int lowerBlock, Expression.Bound lower, int upperBlock, Expression.Bound upper)
+    {
+        // if lower is at the beginning of the block that means we can just do a single iterator per block
+        SearchResult<DataTerm> lowerPosition = (lower == null) ? null : searchIndex(lower.value, lowerBlock);
+        SearchResult<DataTerm> upperPosition = (upper == null) ? null : searchIndex(upper.value, upperBlock);
+
+        RangeUnionIterator.Builder<Long, Token> builder = RangeUnionIterator.builder();
+
+        // optimistically assume that first and last blocks are full block reads, saves at least 3 'else' conditions
+        int firstFullBlockIdx = lowerBlock, lastFullBlockIdx = upperBlock;
+
+        // 'lower' doesn't cover the whole block so we need to do a partial iteration
+        // Two reasons why that can happen:
+        //   - 'lower' is not the first element of the block
+        //   - 'lower' is first element but it's not inclusive in the query
+        if (lowerPosition != null && (lowerPosition.index > 0 || !lower.inclusive))
+        {
+            DataBlock block = dataLevel.getBlock(lowerBlock);
+            int start = (lower.inclusive || lowerPosition.cmp != 0) ? lowerPosition.index : lowerPosition.index + 1;
+
+            builder.add(block.getRange(start, block.termCount()));
+            firstFullBlockIdx = lowerBlock + 1;
+        }
+
+        if (upperPosition != null)
+        {
+            DataBlock block = dataLevel.getBlock(upperBlock);
+            int lastIndex = block.termCount() - 1;
+
+            // The save as with 'lower' but here we need to check if the upper is the last element of the block,
+            // which means that we only have to get individual results if:
+            //  - if it *is not* the last element, or
+            //  - it *is* but shouldn't be included (dictated by upperInclusive)
+            if (upperPosition.index != lastIndex || !upper.inclusive)
+            {
+                int end = (upperPosition.cmp < 0 || (upperPosition.cmp == 0 && upper.inclusive))
+                                ? upperPosition.index + 1 : upperPosition.index;
+
+                builder.add(block.getRange(0, end));
+                lastFullBlockIdx = upperBlock - 1;
+            }
+        }
+
+        int totalSuperBlocks = (lastFullBlockIdx - firstFullBlockIdx) / OnDiskIndexBuilder.SUPER_BLOCK_SIZE;
+
+        // if there are no super-blocks, we can simply read all of the block iterators in sequence
+        if (totalSuperBlocks == 0)
+        {
+            for (int i = firstFullBlockIdx; i <= lastFullBlockIdx; i++)
+                builder.add(dataLevel.getBlock(i).getBlockIndex().iterator(keyFetcher));
+
+            return builder.build();
+        }
+
+        // first get all of the blocks which are aligned before the first super-block in the sequence,
+        // e.g. if the block range was (1, 9) and super-block-size = 4, we need to read 1, 2, 3, 4 - 7 is covered by
+        // super-block, 8, 9 is a remainder.
+
+        int superBlockAlignedStart = firstFullBlockIdx == 0 ? 0 : (int) FBUtilities.align(firstFullBlockIdx, OnDiskIndexBuilder.SUPER_BLOCK_SIZE);
+        for (int blockIdx = firstFullBlockIdx; blockIdx < Math.min(superBlockAlignedStart, lastFullBlockIdx); blockIdx++)
+            builder.add(getBlockIterator(blockIdx));
+
+        // now read all of the super-blocks matched by the request, from the previous comment
+        // it's a block with index 1 (which covers everything from 4 to 7)
+
+        int superBlockIdx = superBlockAlignedStart / OnDiskIndexBuilder.SUPER_BLOCK_SIZE;
+        for (int offset = 0; offset < totalSuperBlocks - 1; offset++)
+            builder.add(dataLevel.getSuperBlock(superBlockIdx++).iterator());
+
+        // now it's time for a remainder read, again from the previous example it's 8, 9 because
+        // we have over-shot previous block but didn't request enough to cover next super-block.
+
+        int lastCoveredBlock = superBlockIdx * OnDiskIndexBuilder.SUPER_BLOCK_SIZE;
+        for (int offset = 0; offset <= (lastFullBlockIdx - lastCoveredBlock); offset++)
+            builder.add(getBlockIterator(lastCoveredBlock + offset));
+
+        return builder.build();
+    }
+
+    private RangeIterator<Long, Token> searchPoint(int lowerBlock, Expression expression)
+    {
+        Iterator<DataTerm> terms = new TermIterator(lowerBlock, expression, IteratorOrder.DESC);
+        RangeUnionIterator.Builder<Long, Token> builder = RangeUnionIterator.builder();
+
+        while (terms.hasNext())
+        {
+            try
+            {
+                builder.add(terms.next().getTokens());
+            }
+            finally
+            {
+                expression.checkpoint();
+            }
+        }
+
+        return builder.build();
+    }
+
+    private RangeIterator<Long, Token> getBlockIterator(int blockIdx)
+    {
+        DataBlock block = dataLevel.getBlock(blockIdx);
+        return (block.hasCombinedIndex)
+                ? block.getBlockIndex().iterator(keyFetcher)
+                : block.getRange(0, block.termCount());
+    }
+
+    public Iterator<DataTerm> iteratorAt(ByteBuffer query, IteratorOrder order, boolean inclusive)
+    {
+        Expression e = new Expression("", comparator);
+        Expression.Bound bound = new Expression.Bound(query, inclusive);
+
+        switch (order)
+        {
+            case DESC:
+                e.setLower(bound);
+                break;
+
+            case ASC:
+                e.setUpper(bound);
+                break;
+
+            default:
+                throw new IllegalArgumentException("Unknown order: " + order);
+        }
+
+        return new TermIterator(levels.length == 0 ? 0 : getBlockIdx(findPointer(query), query), e, order);
+    }
+
+    private int getDataBlock(ByteBuffer query)
+    {
+        return levels.length == 0 ? 0 : getBlockIdx(findPointer(query), query);
+    }
+
+    public Iterator<DataTerm> iterator()
+    {
+        return new TermIterator(0, new Expression("", comparator), IteratorOrder.DESC);
+    }
+
+    public void close() throws IOException
+    {
+        FileUtils.closeQuietly(indexFile);
+    }
+
+    private PointerTerm findPointer(ByteBuffer query)
+    {
+        PointerTerm ptr = null;
+        for (PointerLevel level : levels)
+        {
+            if ((ptr = level.getPointer(ptr, query)) == null)
+                return null;
+        }
+
+        return ptr;
+    }
+
+    private SearchResult<DataTerm> searchIndex(ByteBuffer query, int blockIdx)
+    {
+        return dataLevel.getBlock(blockIdx).search(comparator, query);
+    }
+
+    private int getBlockIdx(PointerTerm ptr, ByteBuffer query)
+    {
+        int blockIdx = 0;
+        if (ptr != null)
+        {
+            int cmp = ptr.compareTo(comparator, query);
+            blockIdx = (cmp == 0 || cmp > 0) ? ptr.getBlock() : ptr.getBlock() + 1;
+        }
+
+        return blockIdx;
+    }
+
+    protected class PointerLevel extends Level<PointerBlock>
+    {
+        public PointerLevel(long offset, int count)
+        {
+            super(offset, count);
+        }
+
+        public PointerTerm getPointer(PointerTerm parent, ByteBuffer query)
+        {
+            return getBlock(getBlockIdx(parent, query)).search(comparator, query).result;
+        }
+
+        protected PointerBlock cast(MappedBuffer block)
+        {
+            return new PointerBlock(block);
+        }
+    }
+
+    protected class DataLevel extends Level<DataBlock>
+    {
+        protected final int superBlockCnt;
+        protected final long superBlocksOffset;
+
+        public DataLevel(long offset, int count)
+        {
+            super(offset, count);
+            long baseOffset = blockOffsets + blockCount * 8;
+            superBlockCnt = indexFile.getInt(baseOffset);
+            superBlocksOffset = baseOffset + 4;
+        }
+
+        protected DataBlock cast(MappedBuffer block)
+        {
+            return new DataBlock(block);
+        }
+
+        public OnDiskSuperBlock getSuperBlock(int idx)
+        {
+            assert idx < superBlockCnt : String.format("requested index %d is greater than super block count %d", idx, superBlockCnt);
+            long blockOffset = indexFile.getLong(superBlocksOffset + idx * 8);
+            return new OnDiskSuperBlock(indexFile.duplicate().position(blockOffset));
+        }
+    }
+
+    protected class OnDiskSuperBlock
+    {
+        private final TokenTree tokenTree;
+
+        public OnDiskSuperBlock(MappedBuffer buffer)
+        {
+            tokenTree = new TokenTree(descriptor, buffer);
+        }
+
+        public RangeIterator<Long, Token> iterator()
+        {
+            return tokenTree.iterator(keyFetcher);
+        }
+    }
+
+    protected abstract class Level<T extends OnDiskBlock>
+    {
+        protected final long blockOffsets;
+        protected final int blockCount;
+
+        public Level(long offsets, int count)
+        {
+            this.blockOffsets = offsets;
+            this.blockCount = count;
+        }
+
+        public T getBlock(int idx) throws FSReadError
+        {
+            assert idx >= 0 && idx < blockCount;
+
+            // calculate block offset and move there
+            // (long is intentional, we'll just need mmap implementation which supports long positions)
+            long blockOffset = indexFile.getLong(blockOffsets + idx * 8);
+            return cast(indexFile.duplicate().position(blockOffset));
+        }
+
+        protected abstract T cast(MappedBuffer block);
+    }
+
+    protected class DataBlock extends OnDiskBlock<DataTerm>
+    {
+        public DataBlock(MappedBuffer data)
+        {
+            super(descriptor, data, BlockType.DATA);
+        }
+
+        protected DataTerm cast(MappedBuffer data)
+        {
+            return new DataTerm(data, termSize, getBlockIndex());
+        }
+
+        public RangeIterator<Long, Token> getRange(int start, int end)
+        {
+            RangeUnionIterator.Builder<Long, Token> builder = RangeUnionIterator.builder();
+            NavigableMap<Long, Token> sparse = new TreeMap<>();
+
+            for (int i = start; i < end; i++)
+            {
+                DataTerm term = getTerm(i);
+
+                if (term.isSparse())
+                {
+                    NavigableMap<Long, Token> tokens = term.getSparseTokens();
+                    for (Map.Entry<Long, Token> t : tokens.entrySet())
+                    {
+                        Token token = sparse.get(t.getKey());
+                        if (token == null)
+                            sparse.put(t.getKey(), t.getValue());
+                        else
+                            token.merge(t.getValue());
+                    }
+                }
+                else
+                {
+                    builder.add(term.getTokens());
+                }
+            }
+
+            PrefetchedTokensIterator prefetched = sparse.isEmpty() ? null : new PrefetchedTokensIterator(sparse);
+
+            if (builder.rangeCount() == 0)
+                return prefetched;
+
+            builder.add(prefetched);
+            return builder.build();
+        }
+    }
+
+    protected class PointerBlock extends OnDiskBlock<PointerTerm>
+    {
+        public PointerBlock(MappedBuffer block)
+        {
+            super(descriptor, block, BlockType.POINTER);
+        }
+
+        protected PointerTerm cast(MappedBuffer data)
+        {
+            return new PointerTerm(data, termSize);
+        }
+    }
+
+    public class DataTerm extends Term implements Comparable<DataTerm>
+    {
+        private final TokenTree perBlockIndex;
+
+        protected DataTerm(MappedBuffer content, OnDiskIndexBuilder.TermSize size, TokenTree perBlockIndex)
+        {
+            super(content, size);
+            this.perBlockIndex = perBlockIndex;
+        }
+
+        public RangeIterator<Long, Token> getTokens()
+        {
+            final long blockEnd = FBUtilities.align(content.position(), OnDiskIndexBuilder.BLOCK_SIZE);
+
+            if (isSparse())
+                return new PrefetchedTokensIterator(getSparseTokens());
+
+            long offset = blockEnd + 4 + content.getInt(getDataOffset() + 1);
+            return new TokenTree(descriptor, indexFile.duplicate().position(offset)).iterator(keyFetcher);
+        }
+
+        public boolean isSparse()
+        {
+            return content.get(getDataOffset()) > 0;
+        }
+
+        public NavigableMap<Long, Token> getSparseTokens()
+        {
+            long ptrOffset = getDataOffset();
+
+            byte size = content.get(ptrOffset);
+
+            assert size > 0;
+
+            NavigableMap<Long, Token> individualTokens = new TreeMap<>();
+            for (int i = 0; i < size; i++)
+            {
+                Token token = perBlockIndex.get(content.getLong(ptrOffset + 1 + (8 * i)), keyFetcher);
+
+                assert token != null;
+                individualTokens.put(token.get(), token);
+            }
+
+            return individualTokens;
+        }
+
+        public int compareTo(DataTerm other)
+        {
+            return other == null ? 1 : compareTo(comparator, other.getTerm());
+        }
+    }
+
+    protected static class PointerTerm extends Term
+    {
+        public PointerTerm(MappedBuffer content, OnDiskIndexBuilder.TermSize size)
+        {
+            super(content, size);
+        }
+
+        public int getBlock()
+        {
+            return content.getInt(getDataOffset());
+        }
+    }
+
+    private static class PrefetchedTokensIterator extends RangeIterator<Long, Token>
+    {
+        private final NavigableMap<Long, Token> tokens;
+        private PeekingIterator<Token> currentIterator;
+
+        public PrefetchedTokensIterator(NavigableMap<Long, Token> tokens)
+        {
+            super(tokens.firstKey(), tokens.lastKey(), tokens.size());
+            this.tokens = tokens;
+            this.currentIterator = Iterators.peekingIterator(tokens.values().iterator());
+        }
+
+        protected Token computeNext()
+        {
+            return currentIterator != null && currentIterator.hasNext()
+                    ? currentIterator.next()
+                    : endOfData();
+        }
+
+        protected void performSkipTo(Long nextToken)
+        {
+            currentIterator = Iterators.peekingIterator(tokens.tailMap(nextToken, true).values().iterator());
+        }
+
+        public void close() throws IOException
+        {
+            endOfData();
+        }
+    }
+
+    public AbstractType<?> getComparator()
+    {
+        return comparator;
+    }
+
+    public String getIndexPath()
+    {
+        return indexPath;
+    }
+
+    private class TermIterator extends AbstractIterator<DataTerm>
+    {
+        private final Expression e;
+        private final IteratorOrder order;
+
+        protected OnDiskBlock<DataTerm> currentBlock;
+        protected int blockIndex, offset;
+
+        private boolean checkLower = true, checkUpper = true;
+
+        public TermIterator(int startBlock, Expression expression, IteratorOrder order)
+        {
+            this.e = expression;
+            this.order = order;
+            this.blockIndex = startBlock;
+
+            nextBlock();
+        }
+
+        protected DataTerm computeNext()
+        {
+            for (;;)
+            {
+                if (currentBlock == null)
+                    return endOfData();
+
+                if (offset >= 0 && offset < currentBlock.termCount())
+                {
+                    DataTerm currentTerm = currentBlock.getTerm(nextOffset());
+
+                    if (checkLower && !e.isLowerSatisfiedBy(currentTerm))
+                        continue;
+
+                    // flip the flag right on the first bounds match
+                    // to avoid expensive comparisons
+                    checkLower = false;
+
+                    if (checkUpper && !e.isUpperSatisfiedBy(currentTerm))
+                        return endOfData();
+
+                    return currentTerm;
+                }
+
+                nextBlock();
+            }
+        }
+
+        protected void nextBlock()
+        {
+            currentBlock = null;
+
+            if (blockIndex < 0 || blockIndex >= dataLevel.blockCount)
+                return;
+
+            currentBlock = dataLevel.getBlock(nextBlockIndex());
+            offset = checkLower ? order.startAt(currentBlock, e) : currentBlock.minOffset(order);
+
+            // let's check the last term of the new block right away
+            // if expression's upper bound is satisfied by it such means that we can avoid
+            // doing any expensive upper bound checks for that block.
+            checkUpper = e.hasUpper() && !e.isUpperSatisfiedBy(currentBlock.getTerm(currentBlock.maxOffset(order)));
+        }
+
+        protected int nextBlockIndex()
+        {
+            int current = blockIndex;
+            blockIndex += order.step;
+            return current;
+        }
+
+        protected int nextOffset()
+        {
+            int current = offset;
+            offset += order.step;
+            return current;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java
new file mode 100644
index 0000000..7b8f5c9
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java
@@ -0,0 +1,627 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.disk;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.index.sasi.sa.IntegralSA;
+import org.apache.cassandra.index.sasi.sa.SA;
+import org.apache.cassandra.index.sasi.sa.TermIterator;
+import org.apache.cassandra.index.sasi.sa.SuffixSA;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.io.util.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+
+import com.carrotsearch.hppc.LongArrayList;
+import com.carrotsearch.hppc.LongSet;
+import com.carrotsearch.hppc.ShortArrayList;
+import com.google.common.annotations.VisibleForTesting;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OnDiskIndexBuilder
+{
+    private static final Logger logger = LoggerFactory.getLogger(OnDiskIndexBuilder.class);
+
+    public enum Mode
+    {
+        PREFIX, CONTAINS, SPARSE;
+
+        public static Mode mode(String mode)
+        {
+            return Mode.valueOf(mode.toUpperCase());
+        }
+    }
+
+    public enum TermSize
+    {
+        INT(4), LONG(8), UUID(16), VARIABLE(-1);
+
+        public final int size;
+
+        TermSize(int size)
+        {
+            this.size = size;
+        }
+
+        public boolean isConstant()
+        {
+            return this != VARIABLE;
+        }
+
+        public static TermSize of(int size)
+        {
+            switch (size)
+            {
+                case -1:
+                    return VARIABLE;
+
+                case 4:
+                    return INT;
+
+                case 8:
+                    return LONG;
+
+                case 16:
+                    return UUID;
+
+                default:
+                    throw new IllegalStateException("unknown state: " + size);
+            }
+        }
+
+        public static TermSize sizeOf(AbstractType<?> comparator)
+        {
+            if (comparator instanceof Int32Type || comparator instanceof FloatType)
+                return INT;
+
+            if (comparator instanceof LongType || comparator instanceof DoubleType
+                    || comparator instanceof TimestampType || comparator instanceof DateType)
+                return LONG;
+
+            if (comparator instanceof TimeUUIDType || comparator instanceof UUIDType)
+                return UUID;
+
+            return VARIABLE;
+        }
+    }
+
+    public static final int BLOCK_SIZE = 4096;
+    public static final int MAX_TERM_SIZE = 1024;
+    public static final int SUPER_BLOCK_SIZE = 64;
+
+    private final List<MutableLevel<InMemoryPointerTerm>> levels = new ArrayList<>();
+    private MutableLevel<InMemoryDataTerm> dataLevel;
+
+    private final TermSize termSize;
+
+    private final AbstractType<?> keyComparator, termComparator;
+
+    private final Map<ByteBuffer, TokenTreeBuilder> terms;
+    private final Mode mode;
+
+    private ByteBuffer minKey, maxKey;
+    private long estimatedBytes;
+
+    public OnDiskIndexBuilder(AbstractType<?> keyComparator, AbstractType<?> comparator, Mode mode)
+    {
+        this.keyComparator = keyComparator;
+        this.termComparator = comparator;
+        this.terms = new HashMap<>();
+        this.termSize = TermSize.sizeOf(comparator);
+        this.mode = mode;
+    }
+
+    public OnDiskIndexBuilder add(ByteBuffer term, DecoratedKey key, long keyPosition)
+    {
+        if (term.remaining() >= MAX_TERM_SIZE)
+        {
+            logger.error("Rejecting value (value size {}, maximum size {} bytes).", term.remaining(), Short.MAX_VALUE);
+            return this;
+        }
+
+        TokenTreeBuilder tokens = terms.get(term);
+        if (tokens == null)
+        {
+            terms.put(term, (tokens = new TokenTreeBuilder()));
+
+            // on-heap size estimates from jol
+            // 64 bytes for TTB + 48 bytes for TreeMap in TTB + size bytes for the term (map key)
+            estimatedBytes += 64 + 48 + term.remaining();
+        }
+
+        tokens.add((Long) key.getToken().getTokenValue(), keyPosition);
+
+        // calculate key range (based on actual key values) for current index
+        minKey = (minKey == null || keyComparator.compare(minKey, key.getKey()) > 0) ? key.getKey() : minKey;
+        maxKey = (maxKey == null || keyComparator.compare(maxKey, key.getKey()) < 0) ? key.getKey() : maxKey;
+
+        // 60 ((boolean(1)*4) + (long(8)*4) + 24) bytes for the LongOpenHashSet created when the keyPosition was added
+        // + 40 bytes for the TreeMap.Entry + 8 bytes for the token (key).
+        // in the case of hash collision for the token we may overestimate but this is extremely rare
+        estimatedBytes += 60 + 40 + 8;
+
+        return this;
+    }
+
+    public long estimatedMemoryUse()
+    {
+        return estimatedBytes;
+    }
+
+    private void addTerm(InMemoryDataTerm term, SequentialWriter out) throws IOException
+    {
+        InMemoryPointerTerm ptr = dataLevel.add(term);
+        if (ptr == null)
+            return;
+
+        int levelIdx = 0;
+        for (;;)
+        {
+            MutableLevel<InMemoryPointerTerm> level = getIndexLevel(levelIdx++, out);
+            if ((ptr = level.add(ptr)) == null)
+                break;
+        }
+    }
+
+    public boolean isEmpty()
+    {
+        return terms.isEmpty();
+    }
+
+    public void finish(Pair<ByteBuffer, ByteBuffer> range, File file, TermIterator terms)
+    {
+        finish(Descriptor.CURRENT, range, file, terms);
+    }
+
+    /**
+     * Finishes up index building process by creating/populating index file.
+     *
+     * @param indexFile The file to write index contents to.
+     *
+     * @return true if index was written successfully, false otherwise (e.g. if index was empty).
+     *
+     * @throws FSWriteError on I/O error.
+     */
+    public boolean finish(File indexFile) throws FSWriteError
+    {
+        return finish(Descriptor.CURRENT, indexFile);
+    }
+
+    @VisibleForTesting
+    protected boolean finish(Descriptor descriptor, File file) throws FSWriteError
+    {
+        // no terms means there is nothing to build
+        if (terms.isEmpty())
+            return false;
+
+        // split terms into suffixes only if it's text, otherwise (even if CONTAINS is set) use terms in original form
+        SA sa = ((termComparator instanceof UTF8Type || termComparator instanceof AsciiType) && mode == Mode.CONTAINS)
+                    ? new SuffixSA(termComparator, mode) : new IntegralSA(termComparator, mode);
+
+        for (Map.Entry<ByteBuffer, TokenTreeBuilder> term : terms.entrySet())
+            sa.add(term.getKey(), term.getValue());
+
+        finish(descriptor, Pair.create(minKey, maxKey), file, sa.finish());
+        return true;
+    }
+
+    protected void finish(Descriptor descriptor, Pair<ByteBuffer, ByteBuffer> range, File file, TermIterator terms)
+    {
+        SequentialWriter out = null;
+
+        try
+        {
+            out = new SequentialWriter(file, BLOCK_SIZE, BufferType.ON_HEAP);
+
+            out.writeUTF(descriptor.version.toString());
+
+            out.writeShort(termSize.size);
+
+            // min, max term (useful to find initial scan range from search expressions)
+            ByteBufferUtil.writeWithShortLength(terms.minTerm(), out);
+            ByteBufferUtil.writeWithShortLength(terms.maxTerm(), out);
+
+            // min, max keys covered by index (useful when searching across multiple indexes)
+            ByteBufferUtil.writeWithShortLength(range.left, out);
+            ByteBufferUtil.writeWithShortLength(range.right, out);
+
+            out.writeUTF(mode.toString());
+
+            out.skipBytes((int) (BLOCK_SIZE - out.position()));
+
+            dataLevel = mode == Mode.SPARSE ? new DataBuilderLevel(out, new MutableDataBlock(mode))
+                                            : new MutableLevel<>(out, new MutableDataBlock(mode));
+            while (terms.hasNext())
+            {
+                Pair<ByteBuffer, TokenTreeBuilder> term = terms.next();
+                addTerm(new InMemoryDataTerm(term.left, term.right), out);
+            }
+
+            dataLevel.finalFlush();
+            for (MutableLevel l : levels)
+                l.flush(); // flush all of the buffers
+
+            // and finally write levels index
+            final long levelIndexPosition = out.position();
+
+            out.writeInt(levels.size());
+            for (int i = levels.size() - 1; i >= 0; i--)
+                levels.get(i).flushMetadata();
+
+            dataLevel.flushMetadata();
+
+            out.writeLong(levelIndexPosition);
+
+            // sync contents of the output and disk,
+            // since it's not done implicitly on close
+            out.sync();
+        }
+        catch (IOException e)
+        {
+            throw new FSWriteError(e, file);
+        }
+        finally
+        {
+            FileUtils.closeQuietly(out);
+        }
+    }
+
+    private MutableLevel<InMemoryPointerTerm> getIndexLevel(int idx, SequentialWriter out)
+    {
+        if (levels.size() == 0)
+            levels.add(new MutableLevel<>(out, new MutableBlock<>()));
+
+        if (levels.size() - 1 < idx)
+        {
+            int toAdd = idx - (levels.size() - 1);
+            for (int i = 0; i < toAdd; i++)
+                levels.add(new MutableLevel<>(out, new MutableBlock<>()));
+        }
+
+        return levels.get(idx);
+    }
+
+    protected static void alignToBlock(SequentialWriter out) throws IOException
+    {
+        long endOfBlock = out.position();
+        if ((endOfBlock & (BLOCK_SIZE - 1)) != 0) // align on the block boundary if needed
+            out.skipBytes((int) (FBUtilities.align(endOfBlock, BLOCK_SIZE) - endOfBlock));
+    }
+
+    private class InMemoryTerm
+    {
+        protected final ByteBuffer term;
+
+        public InMemoryTerm(ByteBuffer term)
+        {
+            this.term = term;
+        }
+
+        public int serializedSize()
+        {
+            return (termSize.isConstant() ? 0 : 2) + term.remaining();
+        }
+
+        public void serialize(DataOutputPlus out) throws IOException
+        {
+            if (termSize.isConstant())
+                out.write(term);
+            else
+                ByteBufferUtil.writeWithShortLength(term, out);
+        }
+    }
+
+    private class InMemoryPointerTerm extends InMemoryTerm
+    {
+        protected final int blockCnt;
+
+        public InMemoryPointerTerm(ByteBuffer term, int blockCnt)
+        {
+            super(term);
+            this.blockCnt = blockCnt;
+        }
+
+        public int serializedSize()
+        {
+            return super.serializedSize() + 4;
+        }
+
+        public void serialize(DataOutputPlus out) throws IOException
+        {
+            super.serialize(out);
+            out.writeInt(blockCnt);
+        }
+    }
+
+    private class InMemoryDataTerm extends InMemoryTerm
+    {
+        private final TokenTreeBuilder keys;
+
+        public InMemoryDataTerm(ByteBuffer term, TokenTreeBuilder keys)
+        {
+            super(term);
+            this.keys = keys;
+        }
+    }
+
+    private class MutableLevel<T extends InMemoryTerm>
+    {
+        private final LongArrayList blockOffsets = new LongArrayList();
+
+        protected final SequentialWriter out;
+
+        private final MutableBlock<T> inProcessBlock;
+        private InMemoryPointerTerm lastTerm;
+
+        public MutableLevel(SequentialWriter out, MutableBlock<T> block)
+        {
+            this.out = out;
+            this.inProcessBlock = block;
+        }
+
+        /**
+         * @return If we flushed a block, return the last term of that block; else, null.
+         */
+        public InMemoryPointerTerm add(T term) throws IOException
+        {
+            InMemoryPointerTerm toPromote = null;
+
+            if (!inProcessBlock.hasSpaceFor(term))
+            {
+                flush();
+                toPromote = lastTerm;
+            }
+
+            inProcessBlock.add(term);
+
+            lastTerm = new InMemoryPointerTerm(term.term, blockOffsets.size());
+            return toPromote;
+        }
+
+        public void flush() throws IOException
+        {
+            blockOffsets.add(out.position());
+            inProcessBlock.flushAndClear(out);
+        }
+
+        public void finalFlush() throws IOException
+        {
+            flush();
+        }
+
+        public void flushMetadata() throws IOException
+        {
+            flushMetadata(blockOffsets);
+        }
+
+        protected void flushMetadata(LongArrayList longArrayList) throws IOException
+        {
+            out.writeInt(longArrayList.size());
+            for (int i = 0; i < longArrayList.size(); i++)
+                out.writeLong(longArrayList.get(i));
+        }
+    }
+
+    /** builds standard data blocks and super blocks, as well */
+    private class DataBuilderLevel extends MutableLevel<InMemoryDataTerm>
+    {
+        private final LongArrayList superBlockOffsets = new LongArrayList();
+
+        /** count of regular data blocks written since current super block was init'd */
+        private int dataBlocksCnt;
+        private TokenTreeBuilder superBlockTree;
+
+        public DataBuilderLevel(SequentialWriter out, MutableBlock<InMemoryDataTerm> block)
+        {
+            super(out, block);
+            superBlockTree = new TokenTreeBuilder();
+        }
+
+        public InMemoryPointerTerm add(InMemoryDataTerm term) throws IOException
+        {
+            InMemoryPointerTerm ptr = super.add(term);
+            if (ptr != null)
+            {
+                dataBlocksCnt++;
+                flushSuperBlock(false);
+            }
+            superBlockTree.add(term.keys.getTokens());
+            return ptr;
+        }
+
+        public void flushSuperBlock(boolean force) throws IOException
+        {
+            if (dataBlocksCnt == SUPER_BLOCK_SIZE || (force && !superBlockTree.getTokens().isEmpty()))
+            {
+                superBlockOffsets.add(out.position());
+                superBlockTree.finish().write(out);
+                alignToBlock(out);
+
+                dataBlocksCnt = 0;
+                superBlockTree = new TokenTreeBuilder();
+            }
+        }
+
+        public void finalFlush() throws IOException
+        {
+            super.flush();
+            flushSuperBlock(true);
+        }
+
+        public void flushMetadata() throws IOException
+        {
+            super.flushMetadata();
+            flushMetadata(superBlockOffsets);
+        }
+    }
+
+    private static class MutableBlock<T extends InMemoryTerm>
+    {
+        protected final DataOutputBufferFixed buffer;
+        protected final ShortArrayList offsets;
+
+        public MutableBlock()
+        {
+            buffer = new DataOutputBufferFixed(BLOCK_SIZE);
+            offsets = new ShortArrayList();
+        }
+
+        public final void add(T term) throws IOException
+        {
+            offsets.add((short) buffer.position());
+            addInternal(term);
+        }
+
+        protected void addInternal(T term) throws IOException
+        {
+            term.serialize(buffer);
+        }
+
+        public boolean hasSpaceFor(T element)
+        {
+            return sizeAfter(element) < BLOCK_SIZE;
+        }
+
+        protected int sizeAfter(T element)
+        {
+            return getWatermark() + 4 + element.serializedSize();
+        }
+
+        protected int getWatermark()
+        {
+            return 4 + offsets.size() * 2 + (int) buffer.position();
+        }
+
+        public void flushAndClear(SequentialWriter out) throws IOException
+        {
+            out.writeInt(offsets.size());
+            for (int i = 0; i < offsets.size(); i++)
+                out.writeShort(offsets.get(i));
+
+            out.write(buffer.buffer());
+
+            alignToBlock(out);
+
+            offsets.clear();
+            buffer.clear();
+        }
+    }
+
+    private static class MutableDataBlock extends MutableBlock<InMemoryDataTerm>
+    {
+        private final Mode mode;
+
+        private int offset = 0;
+        private int sparseValueTerms = 0;
+
+        private final List<TokenTreeBuilder> containers = new ArrayList<>();
+        private TokenTreeBuilder combinedIndex;
+
+        public MutableDataBlock(Mode mode)
+        {
+            this.mode = mode;
+            this.combinedIndex = new TokenTreeBuilder();
+        }
+
+        protected void addInternal(InMemoryDataTerm term) throws IOException
+        {
+            TokenTreeBuilder keys = term.keys;
+
+            if (mode == Mode.SPARSE && keys.getTokenCount() <= 5)
+            {
+                writeTerm(term, keys);
+                sparseValueTerms++;
+            }
+            else
+            {
+                writeTerm(term, offset);
+
+                offset += keys.serializedSize();
+                containers.add(keys);
+            }
+
+            if (mode == Mode.SPARSE)
+                combinedIndex.add(keys.getTokens());
+        }
+
+        protected int sizeAfter(InMemoryDataTerm element)
+        {
+            return super.sizeAfter(element) + ptrLength(element);
+        }
+
+        public void flushAndClear(SequentialWriter out) throws IOException
+        {
+            super.flushAndClear(out);
+
+            out.writeInt((sparseValueTerms == 0) ? -1 : offset);
+
+            if (containers.size() > 0)
+            {
+                for (TokenTreeBuilder tokens : containers)
+                    tokens.write(out);
+            }
+
+            if (sparseValueTerms > 0)
+            {
+                combinedIndex.finish().write(out);
+            }
+
+            alignToBlock(out);
+
+            containers.clear();
+            combinedIndex = new TokenTreeBuilder();
+
+            offset = 0;
+            sparseValueTerms = 0;
+        }
+
+        private int ptrLength(InMemoryDataTerm term)
+        {
+            return (term.keys.getTokenCount() > 5)
+                    ? 5 // 1 byte type + 4 byte offset to the tree
+                    : 1 + (8 * (int) term.keys.getTokenCount()); // 1 byte size + n 8 byte tokens
+        }
+
+        private void writeTerm(InMemoryTerm term, TokenTreeBuilder keys) throws IOException
+        {
+            term.serialize(buffer);
+            buffer.writeByte((byte) keys.getTokenCount());
+
+            Iterator<Pair<Long, LongSet>> tokens = keys.iterator();
+            while (tokens.hasNext())
+                buffer.writeLong(tokens.next().left);
+        }
+
+        private void writeTerm(InMemoryTerm term, int offset) throws IOException
+        {
+            term.serialize(buffer);
+            buffer.writeByte(0x0);
+            buffer.writeInt(offset);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java b/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java
new file mode 100644
index 0000000..6e63c71
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.disk;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.*;
+
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.Unfiltered;
+import org.apache.cassandra.index.sasi.analyzer.AbstractAnalyzer;
+import org.apache.cassandra.index.sasi.conf.ColumnIndex;
+import org.apache.cassandra.index.sasi.utils.CombinedTermIterator;
+import org.apache.cassandra.index.sasi.utils.TypeUtil;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.SSTableFlushObserver;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.Uninterruptibles;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PerSSTableIndexWriter implements SSTableFlushObserver
+{
+    private static final Logger logger = LoggerFactory.getLogger(PerSSTableIndexWriter.class);
+
+    private static final ThreadPoolExecutor INDEX_FLUSHER_MEMTABLE;
+    private static final ThreadPoolExecutor INDEX_FLUSHER_GENERAL;
+
+    static
+    {
+        INDEX_FLUSHER_GENERAL = new JMXEnabledThreadPoolExecutor(1, 8, 60, TimeUnit.SECONDS,
+                                                                 new LinkedBlockingQueue<>(),
+                                                                 new NamedThreadFactory("SASI-General"),
+                                                                 "internal");
+        INDEX_FLUSHER_GENERAL.allowCoreThreadTimeOut(true);
+
+        INDEX_FLUSHER_MEMTABLE = new JMXEnabledThreadPoolExecutor(1, 8, 60, TimeUnit.SECONDS,
+                                                                  new LinkedBlockingQueue<>(),
+                                                                  new NamedThreadFactory("SASI-Memtable"),
+                                                                  "internal");
+        INDEX_FLUSHER_MEMTABLE.allowCoreThreadTimeOut(true);
+    }
+
+    private final int nowInSec = FBUtilities.nowInSeconds();
+
+    private final Descriptor descriptor;
+    private final OperationType source;
+
+    private final AbstractType<?> keyValidator;
+    private final Map<ColumnDefinition, ColumnIndex> supportedIndexes;
+
+    @VisibleForTesting
+    protected final Map<ColumnDefinition, Index> indexes;
+
+    private DecoratedKey currentKey;
+    private long currentKeyPosition;
+    private boolean isComplete;
+
+    public PerSSTableIndexWriter(AbstractType<?> keyValidator,
+                                 Descriptor descriptor,
+                                 OperationType source,
+                                 Map<ColumnDefinition, ColumnIndex> supportedIndexes)
+    {
+        this.keyValidator = keyValidator;
+        this.descriptor = descriptor;
+        this.source = source;
+        this.supportedIndexes = supportedIndexes;
+        this.indexes = new HashMap<>();
+    }
+
+    public void begin()
+    {}
+
+    public void startPartition(DecoratedKey key, long curPosition)
+    {
+        currentKey = key;
+        currentKeyPosition = curPosition;
+    }
+
+    public void nextUnfilteredCluster(Unfiltered unfiltered)
+    {
+        if (!unfiltered.isRow())
+            return;
+
+        Row row = (Row) unfiltered;
+
+        supportedIndexes.keySet().forEach((column) -> {
+            ByteBuffer value = ColumnIndex.getValueOf(column, row, nowInSec);
+            if (value == null)
+                return;
+
+            ColumnIndex columnIndex = supportedIndexes.get(column);
+            if (columnIndex == null)
+                return;
+
+            Index index = indexes.get(column);
+            if (index == null)
+                indexes.put(column, (index = new Index(columnIndex)));
+
+            index.add(value.duplicate(), currentKey, currentKeyPosition);
+        });
+    }
+
+    public void complete()
+    {
+        if (isComplete)
+            return;
+
+        currentKey = null;
+
+        try
+        {
+            CountDownLatch latch = new CountDownLatch(indexes.size());
+            for (Index index : indexes.values())
+                index.complete(latch);
+
+            Uninterruptibles.awaitUninterruptibly(latch);
+        }
+        finally
+        {
+            indexes.clear();
+            isComplete = true;
+        }
+    }
+
+    public Index getIndex(ColumnDefinition columnDef)
+    {
+        return indexes.get(columnDef);
+    }
+
+    public Descriptor getDescriptor()
+    {
+        return descriptor;
+    }
+
+    @VisibleForTesting
+    protected class Index
+    {
+        private final ColumnIndex columnIndex;
+        private final String outputFile;
+        private final AbstractAnalyzer analyzer;
+        private final long maxMemorySize;
+
+        @VisibleForTesting
+        protected final Set<Future<OnDiskIndex>> segments;
+        private int segmentNumber = 0;
+
+        private OnDiskIndexBuilder currentBuilder;
+
+        public Index(ColumnIndex columnIndex)
+        {
+            this.columnIndex = columnIndex;
+            this.outputFile = descriptor.filenameFor(columnIndex.getComponent());
+            this.analyzer = columnIndex.getAnalyzer();
+            this.segments = new HashSet<>();
+            this.maxMemorySize = maxMemorySize(columnIndex);
+            this.currentBuilder = newIndexBuilder();
+        }
+
+        public void add(ByteBuffer term, DecoratedKey key, long keyPosition)
+        {
+            if (term.remaining() == 0)
+                return;
+
+            boolean isAdded = false;
+
+            analyzer.reset(term);
+            while (analyzer.hasNext())
+            {
+                ByteBuffer token = analyzer.next();
+                int size = token.remaining();
+
+                if (token.remaining() >= OnDiskIndexBuilder.MAX_TERM_SIZE)
+                {
+                    logger.info("Rejecting value (size {}, maximum {} bytes) for column {} (analyzed {}) at {} SSTable.",
+                            term.remaining(),
+                            OnDiskIndexBuilder.MAX_TERM_SIZE,
+                            columnIndex.getColumnName(),
+                            columnIndex.getMode().isAnalyzed,
+                            descriptor);
+                    continue;
+                }
+
+                if (!TypeUtil.isValid(token, columnIndex.getValidator()))
+                {
+                    if ((token = TypeUtil.tryUpcast(token, columnIndex.getValidator())) == null)
+                    {
+                        logger.info("({}) Failed to add {} to index for key: {}, value size was {} bytes, validator is {}.",
+                                    outputFile,
+                                    columnIndex.getColumnName(),
+                                    keyValidator.getString(key.getKey()),
+                                    size,
+                                    columnIndex.getValidator());
+                        continue;
+                    }
+                }
+
+                currentBuilder.add(token, key, keyPosition);
+                isAdded = true;
+            }
+
+            if (!isAdded || currentBuilder.estimatedMemoryUse() < maxMemorySize)
+                return; // non of the generated tokens were added to the index or memory size wasn't reached
+
+            segments.add(getExecutor().submit(scheduleSegmentFlush(false)));
+        }
+
+        @VisibleForTesting
+        protected Callable<OnDiskIndex> scheduleSegmentFlush(final boolean isFinal)
+        {
+            final OnDiskIndexBuilder builder = currentBuilder;
+            currentBuilder = newIndexBuilder();
+
+            final String segmentFile = filename(isFinal);
+
+            return () -> {
+                long start1 = System.nanoTime();
+
+                try
+                {
+                    File index = new File(segmentFile);
+                    return builder.finish(index) ? new OnDiskIndex(index, columnIndex.getValidator(), null) : null;
+                }
+                finally
+                {
+                    if (!isFinal)
+                        logger.info("Flushed index segment {}, took {} ms.", segmentFile, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start1));
+                }
+            };
+        }
+
+        public void complete(final CountDownLatch latch)
+        {
+            logger.info("Scheduling index flush to {}", outputFile);
+
+            getExecutor().submit((Runnable) () -> {
+                long start1 = System.nanoTime();
+
+                OnDiskIndex[] parts = new OnDiskIndex[segments.size() + 1];
+
+                try
+                {
+                    // no parts present, build entire index from memory
+                    if (segments.isEmpty())
+                    {
+                        scheduleSegmentFlush(true).call();
+                        return;
+                    }
+
+                    // parts are present but there is something still in memory, let's flush that inline
+                    if (!currentBuilder.isEmpty())
+                    {
+                        OnDiskIndex last = scheduleSegmentFlush(false).call();
+                        segments.add(Futures.immediateFuture(last));
+                    }
+
+                    int index = 0;
+                    ByteBuffer combinedMin = null, combinedMax = null;
+
+                    for (Future<OnDiskIndex> f : segments)
+                    {
+                        OnDiskIndex part = Futures.getUnchecked(f);
+                        if (part == null)
+                            continue;
+
+                        parts[index++] = part;
+                        combinedMin = (combinedMin == null || keyValidator.compare(combinedMin, part.minKey()) > 0) ? part.minKey() : combinedMin;
+                        combinedMax = (combinedMax == null || keyValidator.compare(combinedMax, part.maxKey()) < 0) ? part.maxKey() : combinedMax;
+                    }
+
+                    OnDiskIndexBuilder builder = newIndexBuilder();
+                    builder.finish(Pair.create(combinedMin, combinedMax),
+                                   new File(outputFile),
+                                   new CombinedTermIterator(parts));
+                }
+                catch (Exception e)
+                {
+                    logger.error("Failed to flush index {}.", outputFile, e);
+                    FileUtils.delete(outputFile);
+                }
+                finally
+                {
+                    logger.info("Index flush to {} took {} ms.", outputFile, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start1));
+
+                    for (OnDiskIndex part : parts)
+                    {
+                        if (part == null)
+                            continue;
+
+                        FileUtils.closeQuietly(part);
+                        FileUtils.delete(part.getIndexPath());
+                    }
+
+                    latch.countDown();
+                }
+            });
+        }
+
+        private ExecutorService getExecutor()
+        {
+            return source == OperationType.FLUSH ? INDEX_FLUSHER_MEMTABLE : INDEX_FLUSHER_GENERAL;
+        }
+
+        private OnDiskIndexBuilder newIndexBuilder()
+        {
+            return new OnDiskIndexBuilder(keyValidator, columnIndex.getValidator(), columnIndex.getMode().mode);
+        }
+
+        public String filename(boolean isFinal)
+        {
+            return outputFile + (isFinal ? "" : "_" + segmentNumber++);
+        }
+    }
+
+    protected long maxMemorySize(ColumnIndex columnIndex)
+    {
+        // 1G for memtable and configuration for compaction
+        return source == OperationType.FLUSH ? 1073741824L : columnIndex.getMode().maxCompactionFlushMemoryInMb;
+    }
+
+    public int hashCode()
+    {
+        return descriptor.hashCode();
+    }
+
+    public boolean equals(Object o)
+    {
+        return !(o == null || !(o instanceof PerSSTableIndexWriter)) && descriptor.equals(((PerSSTableIndexWriter) o).descriptor);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/disk/Token.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/Token.java b/src/java/org/apache/cassandra/index/sasi/disk/Token.java
new file mode 100644
index 0000000..02130a3
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/disk/Token.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.disk;
+
+import com.google.common.primitives.Longs;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.index.sasi.utils.CombinedValue;
+
+public abstract class Token implements CombinedValue<Long>, Iterable<DecoratedKey>
+{
+    protected final long token;
+
+    public Token(long token)
+    {
+        this.token = token;
+    }
+
+    public Long get()
+    {
+        return token;
+    }
+
+    public int compareTo(CombinedValue<Long> o)
+    {
+        return Longs.compare(token, ((Token) o).token);
+    }
+}