You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by xe...@apache.org on 2016/03/28 00:21:42 UTC

[2/2] cassandra git commit: Avoid index segment stitching in RAM which lead to OOM on big SSTable files

Avoid index segment stitching in RAM which lead to OOM on big SSTable files

patch by jrwest and xedin; reviewed by xedin for CASSANDRA-11383


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5c4d5c73
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5c4d5c73
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5c4d5c73

Branch: refs/heads/cassandra-3.5
Commit: 5c4d5c731f1299ba310c81603914a1a8956e644c
Parents: f6c5d72
Author: Jordan West <jo...@gmail.com>
Authored: Mon Mar 21 12:00:31 2016 -0700
Committer: Pavel Yaskevich <xe...@apache.org>
Committed: Sun Mar 27 15:21:16 2016 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../sasi/disk/AbstractTokenTreeBuilder.java     | 672 ++++++++++++++++
 .../sasi/disk/DynamicTokenTreeBuilder.java      | 189 +++++
 .../index/sasi/disk/OnDiskIndexBuilder.java     |  52 +-
 .../index/sasi/disk/PerSSTableIndexWriter.java  |  37 +-
 .../index/sasi/disk/StaticTokenTreeBuilder.java | 266 ++++++
 .../apache/cassandra/index/sasi/disk/Token.java |   5 +
 .../cassandra/index/sasi/disk/TokenTree.java    |   6 +-
 .../index/sasi/disk/TokenTreeBuilder.java       | 805 +------------------
 .../index/sasi/memory/KeyRangeIterator.java     |  11 +
 .../cassandra/index/sasi/sa/SuffixSA.java       |   7 +-
 .../index/sasi/utils/CombinedTerm.java          |  46 +-
 .../index/sasi/disk/OnDiskIndexTest.java        |  20 +-
 .../sasi/disk/PerSSTableIndexWriterTest.java    |  90 +++
 .../index/sasi/disk/TokenTreeTest.java          | 217 +++--
 .../index/sasi/utils/LongIterator.java          |   8 +
 16 files changed, 1482 insertions(+), 950 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c4d5c73/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f86c91f..2907df9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.5
+ * Avoid index segment stitching in RAM which lead to OOM on big SSTable files (CASSANDRA-11383)
  * Fix clustering and row filters for LIKE queries on clustering columns (CASSANDRA-11397)
 Merged from 3.0:
  * Enable SO_REUSEADDR for JMX RMI server sockets (CASSANDRA-11093)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c4d5c73/src/java/org/apache/cassandra/index/sasi/disk/AbstractTokenTreeBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/AbstractTokenTreeBuilder.java b/src/java/org/apache/cassandra/index/sasi/disk/AbstractTokenTreeBuilder.java
new file mode 100644
index 0000000..4e93b2b
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/disk/AbstractTokenTreeBuilder.java
@@ -0,0 +1,672 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.index.sasi.disk;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.AbstractIterator;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+
+import com.carrotsearch.hppc.LongArrayList;
+import com.carrotsearch.hppc.LongSet;
+import com.carrotsearch.hppc.cursors.LongCursor;
+
+public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
+{
+    protected int numBlocks;
+    protected Node root;
+    protected InteriorNode rightmostParent;
+    protected Leaf leftmostLeaf;
+    protected Leaf rightmostLeaf;
+    protected long tokenCount = 0;
+    protected long treeMinToken;
+    protected long treeMaxToken;
+
+    public void add(TokenTreeBuilder other)
+    {
+        add(other.iterator());
+    }
+
+    public TokenTreeBuilder finish()
+    {
+        if (root == null)
+            constructTree();
+
+        return this;
+    }
+
+    public long getTokenCount()
+    {
+        return tokenCount;
+    }
+
+    public int serializedSize()
+    {
+        if (numBlocks == 1)
+            return (BLOCK_HEADER_BYTES + ((int) tokenCount * 16));
+        else
+            return numBlocks * BLOCK_BYTES;
+    }
+
+    public void write(DataOutputPlus out) throws IOException
+    {
+        ByteBuffer blockBuffer = ByteBuffer.allocate(BLOCK_BYTES);
+        Iterator<Node> levelIterator = root.levelIterator();
+        long childBlockIndex = 1;
+
+        while (levelIterator != null)
+        {
+            Node firstChild = null;
+            while (levelIterator.hasNext())
+            {
+                Node block = levelIterator.next();
+
+                if (firstChild == null && !block.isLeaf())
+                    firstChild = ((InteriorNode) block).children.get(0);
+
+                if (block.isSerializable())
+                {
+                    block.serialize(childBlockIndex, blockBuffer);
+                    flushBuffer(blockBuffer, out, numBlocks != 1);
+                }
+
+                childBlockIndex += block.childCount();
+            }
+
+            levelIterator = (firstChild == null) ? null : firstChild.levelIterator();
+        }
+    }
+
+    protected abstract void constructTree();
+
+    protected void flushBuffer(ByteBuffer buffer, DataOutputPlus o, boolean align) throws IOException
+    {
+        // seek to end of last block before flushing
+        if (align)
+            alignBuffer(buffer, BLOCK_BYTES);
+
+        buffer.flip();
+        o.write(buffer);
+        buffer.clear();
+    }
+
+    protected abstract class Node
+    {
+        protected InteriorNode parent;
+        protected Node next;
+        protected Long nodeMinToken, nodeMaxToken;
+
+        public Node(Long minToken, Long maxToken)
+        {
+            nodeMinToken = minToken;
+            nodeMaxToken = maxToken;
+        }
+
+        public abstract boolean isSerializable();
+        public abstract void serialize(long childBlockIndex, ByteBuffer buf);
+        public abstract int childCount();
+        public abstract int tokenCount();
+
+        public Long smallestToken()
+        {
+            return nodeMinToken;
+        }
+
+        public Long largestToken()
+        {
+            return nodeMaxToken;
+        }
+
+        public Iterator<Node> levelIterator()
+        {
+            return new LevelIterator(this);
+        }
+
+        public boolean isLeaf()
+        {
+            return (this instanceof Leaf);
+        }
+
+        protected boolean isLastLeaf()
+        {
+            return this == rightmostLeaf;
+        }
+
+        protected boolean isRoot()
+        {
+            return this == root;
+        }
+
+        protected void updateTokenRange(long token)
+        {
+            nodeMinToken = nodeMinToken == null ? token : Math.min(nodeMinToken, token);
+            nodeMaxToken = nodeMaxToken == null ? token : Math.max(nodeMaxToken, token);
+        }
+
+        protected void serializeHeader(ByteBuffer buf)
+        {
+            Header header;
+            if (isRoot())
+                header = new RootHeader();
+            else if (!isLeaf())
+                header = new InteriorNodeHeader();
+            else
+                header = new LeafHeader();
+
+            header.serialize(buf);
+            alignBuffer(buf, BLOCK_HEADER_BYTES);
+        }
+
+        private abstract class Header
+        {
+            public void serialize(ByteBuffer buf)
+            {
+                buf.put(infoByte())
+                   .putShort((short) (tokenCount()))
+                   .putLong(nodeMinToken)
+                   .putLong(nodeMaxToken);
+            }
+
+            protected abstract byte infoByte();
+        }
+
+        private class RootHeader extends Header
+        {
+            public void serialize(ByteBuffer buf)
+            {
+                super.serialize(buf);
+                writeMagic(buf);
+                buf.putLong(tokenCount)
+                   .putLong(treeMinToken)
+                   .putLong(treeMaxToken);
+            }
+
+            protected byte infoByte()
+            {
+                // if leaf, set leaf indicator and last leaf indicator (bits 0 & 1)
+                // if not leaf, clear both bits
+                return (byte) ((isLeaf()) ? 3 : 0);
+            }
+
+            protected void writeMagic(ByteBuffer buf)
+            {
+                switch (Descriptor.CURRENT_VERSION)
+                {
+                    case Descriptor.VERSION_AB:
+                        buf.putShort(AB_MAGIC);
+                        break;
+
+                    default:
+                        break;
+                }
+
+            }
+        }
+
+        private class InteriorNodeHeader extends Header
+        {
+            // bit 0 (leaf indicator) & bit 1 (last leaf indicator) cleared
+            protected byte infoByte()
+            {
+                return 0;
+            }
+        }
+
+        private class LeafHeader extends Header
+        {
+            // bit 0 set as leaf indicator
+            // bit 1 set if this is last leaf of data
+            protected byte infoByte()
+            {
+                byte infoByte = 1;
+                infoByte |= (isLastLeaf()) ? (1 << LAST_LEAF_SHIFT) : 0;
+
+                return infoByte;
+            }
+        }
+
+    }
+
+    protected abstract class Leaf extends Node
+    {
+        protected LongArrayList overflowCollisions;
+
+        public Leaf(Long minToken, Long maxToken)
+        {
+            super(minToken, maxToken);
+        }
+
+        public int childCount()
+        {
+            return 0;
+        }
+
+        protected void serializeOverflowCollisions(ByteBuffer buf)
+        {
+            if (overflowCollisions != null)
+                for (LongCursor offset : overflowCollisions)
+                    buf.putLong(offset.value);
+        }
+
+        public void serialize(long childBlockIndex, ByteBuffer buf)
+        {
+            serializeHeader(buf);
+            serializeData(buf);
+            serializeOverflowCollisions(buf);
+        }
+
+        protected abstract void serializeData(ByteBuffer buf);
+
+        protected LeafEntry createEntry(final long tok, final LongSet offsets)
+        {
+            int offsetCount = offsets.size();
+            switch (offsetCount)
+            {
+                case 0:
+                    throw new AssertionError("no offsets for token " + tok);
+                case 1:
+                    long offset = offsets.toArray()[0];
+                    if (offset > MAX_OFFSET)
+                        throw new AssertionError("offset " + offset + " cannot be greater than " + MAX_OFFSET);
+                    else if (offset <= Integer.MAX_VALUE)
+                        return new SimpleLeafEntry(tok, offset);
+                    else
+                        return new FactoredOffsetLeafEntry(tok, offset);
+                case 2:
+                    long[] rawOffsets = offsets.toArray();
+                    if (rawOffsets[0] <= Integer.MAX_VALUE && rawOffsets[1] <= Integer.MAX_VALUE &&
+                        (rawOffsets[0] <= Short.MAX_VALUE || rawOffsets[1] <= Short.MAX_VALUE))
+                        return new PackedCollisionLeafEntry(tok, rawOffsets);
+                    else
+                        return createOverflowEntry(tok, offsetCount, offsets);
+                default:
+                    return createOverflowEntry(tok, offsetCount, offsets);
+            }
+        }
+
+        private LeafEntry createOverflowEntry(final long tok, final int offsetCount, final LongSet offsets)
+        {
+            if (overflowCollisions == null)
+                overflowCollisions = new LongArrayList();
+
+            LeafEntry entry = new OverflowCollisionLeafEntry(tok, (short) overflowCollisions.size(), (short) offsetCount);
+            for (LongCursor o : offsets) {
+                if (overflowCollisions.size() == OVERFLOW_TRAILER_CAPACITY)
+                    throw new AssertionError("cannot have more than " + OVERFLOW_TRAILER_CAPACITY + " overflow collisions per leaf");
+                else
+                    overflowCollisions.add(o.value);
+            }
+            return entry;
+        }
+
+        protected abstract class LeafEntry
+        {
+            protected final long token;
+
+            abstract public EntryType type();
+            abstract public int offsetData();
+            abstract public short offsetExtra();
+
+            public LeafEntry(final long tok)
+            {
+                token = tok;
+            }
+
+            public void serialize(ByteBuffer buf)
+            {
+                buf.putShort((short) type().ordinal())
+                   .putShort(offsetExtra())
+                   .putLong(token)
+                   .putInt(offsetData());
+            }
+
+        }
+
+
+        // assumes there is a single offset and the offset is <= Integer.MAX_VALUE
+        protected class SimpleLeafEntry extends LeafEntry
+        {
+            private final long offset;
+
+            public SimpleLeafEntry(final long tok, final long off)
+            {
+                super(tok);
+                offset = off;
+            }
+
+            public EntryType type()
+            {
+                return EntryType.SIMPLE;
+            }
+
+            public int offsetData()
+            {
+                return (int) offset;
+            }
+
+            public short offsetExtra()
+            {
+                return 0;
+            }
+        }
+
+        // assumes there is a single offset and Integer.MAX_VALUE < offset <= MAX_OFFSET
+        // take the middle 32 bits of offset (or the top 32 when considering offset is max 48 bits)
+        // and store where offset is normally stored. take bottom 16 bits of offset and store in entry header
+        private class FactoredOffsetLeafEntry extends LeafEntry
+        {
+            private final long offset;
+
+            public FactoredOffsetLeafEntry(final long tok, final long off)
+            {
+                super(tok);
+                offset = off;
+            }
+
+            public EntryType type()
+            {
+                return EntryType.FACTORED;
+            }
+
+            public int offsetData()
+            {
+                return (int) (offset >>> Short.SIZE);
+            }
+
+            public short offsetExtra()
+            {
+                return (short) offset;
+            }
+        }
+
+        // holds an entry with two offsets that can be packed in an int & a short
+        // the int offset is stored where offset is normally stored. short offset is
+        // stored in entry header
+        private class PackedCollisionLeafEntry extends LeafEntry
+        {
+            private short smallerOffset;
+            private int largerOffset;
+
+            public PackedCollisionLeafEntry(final long tok, final long[] offs)
+            {
+                super(tok);
+
+                smallerOffset = (short) Math.min(offs[0], offs[1]);
+                largerOffset = (int) Math.max(offs[0], offs[1]);
+            }
+
+            public EntryType type()
+            {
+                return EntryType.PACKED;
+            }
+
+            public int offsetData()
+            {
+                return largerOffset;
+            }
+
+            public short offsetExtra()
+            {
+                return smallerOffset;
+            }
+        }
+
+        // holds an entry with three or more offsets, or two offsets that cannot
+        // be packed into an int & a short. the index into the overflow list
+        // is stored where the offset is normally stored. the number of overflowed offsets
+        // for the entry is stored in the entry header
+        private class OverflowCollisionLeafEntry extends LeafEntry
+        {
+            private final short startIndex;
+            private final short count;
+
+            public OverflowCollisionLeafEntry(final long tok, final short collisionStartIndex, final short collisionCount)
+            {
+                super(tok);
+                startIndex = collisionStartIndex;
+                count = collisionCount;
+            }
+
+            public EntryType type()
+            {
+                return EntryType.OVERFLOW;
+            }
+
+            public int offsetData()
+            {
+                return startIndex;
+            }
+
+            public short offsetExtra()
+            {
+                return count;
+            }
+
+        }
+
+    }
+
+    protected class InteriorNode extends Node
+    {
+        protected List<Long> tokens = new ArrayList<>(TOKENS_PER_BLOCK);
+        protected List<Node> children = new ArrayList<>(TOKENS_PER_BLOCK + 1);
+        protected int position = 0;
+
+        public InteriorNode()
+        {
+            super(null, null);
+        }
+
+        public boolean isSerializable()
+        {
+            return true;
+        }
+
+        public void serialize(long childBlockIndex, ByteBuffer buf)
+        {
+            serializeHeader(buf);
+            serializeTokens(buf);
+            serializeChildOffsets(childBlockIndex, buf);
+        }
+
+        public int childCount()
+        {
+            return children.size();
+        }
+
+        public int tokenCount()
+        {
+            return tokens.size();
+        }
+
+        public Long smallestToken()
+        {
+            return tokens.get(0);
+        }
+
+        protected void add(Long token, InteriorNode leftChild, InteriorNode rightChild)
+        {
+            int pos = tokens.size();
+            if (pos == TOKENS_PER_BLOCK)
+            {
+                InteriorNode sibling = split();
+                sibling.add(token, leftChild, rightChild);
+
+            }
+            else {
+                if (leftChild != null)
+                    children.add(pos, leftChild);
+
+                if (rightChild != null)
+                {
+                    children.add(pos + 1, rightChild);
+                    rightChild.parent = this;
+                }
+
+                updateTokenRange(token);
+                tokens.add(pos, token);
+            }
+        }
+
+        protected void add(Leaf node)
+        {
+
+            if (position == (TOKENS_PER_BLOCK + 1))
+            {
+                rightmostParent = split();
+                rightmostParent.add(node);
+            }
+            else
+            {
+
+                node.parent = this;
+                children.add(position, node);
+                position++;
+
+                // the first child is referenced only during bulk load. we don't take a value
+                // to store into the tree, one is subtracted since position has already been incremented
+                // for the next node to be added
+                if (position - 1 == 0)
+                    return;
+
+
+                // tokens are inserted one behind the current position, but 2 is subtracted because
+                // position has already been incremented for the next add
+                Long smallestToken = node.smallestToken();
+                updateTokenRange(smallestToken);
+                tokens.add(position - 2, smallestToken);
+            }
+
+        }
+
+        protected InteriorNode split()
+        {
+            Pair<Long, InteriorNode> splitResult = splitBlock();
+            Long middleValue = splitResult.left;
+            InteriorNode sibling = splitResult.right;
+            InteriorNode leftChild = null;
+
+            // create a new root if necessary
+            if (parent == null)
+            {
+                parent = new InteriorNode();
+                root = parent;
+                sibling.parent = parent;
+                leftChild = this;
+                numBlocks++;
+            }
+
+            parent.add(middleValue, leftChild, sibling);
+
+            return sibling;
+        }
+
+        protected Pair<Long, InteriorNode> splitBlock()
+        {
+            final int splitPosition = TOKENS_PER_BLOCK - 2;
+            InteriorNode sibling = new InteriorNode();
+            sibling.parent = parent;
+            next = sibling;
+
+            Long middleValue = tokens.get(splitPosition);
+
+            for (int i = splitPosition; i < TOKENS_PER_BLOCK; i++)
+            {
+                if (i != TOKENS_PER_BLOCK && i != splitPosition)
+                {
+                    long token = tokens.get(i);
+                    sibling.updateTokenRange(token);
+                    sibling.tokens.add(token);
+                }
+
+                Node child = children.get(i + 1);
+                child.parent = sibling;
+                sibling.children.add(child);
+                sibling.position++;
+            }
+
+            for (int i = TOKENS_PER_BLOCK; i >= splitPosition; i--)
+            {
+                if (i != TOKENS_PER_BLOCK)
+                    tokens.remove(i);
+
+                if (i != splitPosition)
+                    children.remove(i);
+            }
+
+            nodeMinToken = smallestToken();
+            nodeMaxToken = tokens.get(tokens.size() - 1);
+            numBlocks++;
+
+            return Pair.create(middleValue, sibling);
+        }
+
+        protected boolean isFull()
+        {
+            return (position >= TOKENS_PER_BLOCK + 1);
+        }
+
+        private void serializeTokens(ByteBuffer buf)
+        {
+            tokens.forEach(buf::putLong);
+        }
+
+        private void serializeChildOffsets(long childBlockIndex, ByteBuffer buf)
+        {
+            for (int i = 0; i < children.size(); i++)
+                buf.putLong((childBlockIndex + i) * BLOCK_BYTES);
+        }
+    }
+
+    public static class LevelIterator extends AbstractIterator<Node>
+    {
+        private Node currentNode;
+
+        LevelIterator(Node first)
+        {
+            currentNode = first;
+        }
+
+        public Node computeNext()
+        {
+            if (currentNode == null)
+                return endOfData();
+
+            Node returnNode = currentNode;
+            currentNode = returnNode.next;
+
+            return returnNode;
+        }
+    }
+
+
+    protected static void alignBuffer(ByteBuffer buffer, int blockSize)
+    {
+        long curPos = buffer.position();
+        if ((curPos & (blockSize - 1)) != 0) // align on the block boundary if needed
+            buffer.position((int) FBUtilities.align(curPos, blockSize));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c4d5c73/src/java/org/apache/cassandra/index/sasi/disk/DynamicTokenTreeBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/DynamicTokenTreeBuilder.java b/src/java/org/apache/cassandra/index/sasi/disk/DynamicTokenTreeBuilder.java
new file mode 100644
index 0000000..2ddfd89
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/disk/DynamicTokenTreeBuilder.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.disk;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.utils.AbstractIterator;
+import org.apache.cassandra.utils.Pair;
+
+import com.carrotsearch.hppc.LongOpenHashSet;
+import com.carrotsearch.hppc.LongSet;
+import com.carrotsearch.hppc.cursors.LongCursor;
+
+public class DynamicTokenTreeBuilder extends AbstractTokenTreeBuilder
+{
+    private final SortedMap<Long, LongSet> tokens = new TreeMap<>();
+
+
+    public DynamicTokenTreeBuilder()
+    {}
+
+    public DynamicTokenTreeBuilder(TokenTreeBuilder data)
+    {
+        add(data);
+    }
+
+    public DynamicTokenTreeBuilder(SortedMap<Long, LongSet> data)
+    {
+        add(data);
+    }
+
+    public void add(Long token, long keyPosition)
+    {
+        LongSet found = tokens.get(token);
+        if (found == null)
+            tokens.put(token, (found = new LongOpenHashSet(2)));
+
+        found.add(keyPosition);
+    }
+
+    public void add(Iterator<Pair<Long, LongSet>> data)
+    {
+        while (data.hasNext())
+        {
+            Pair<Long, LongSet> entry = data.next();
+            for (LongCursor l : entry.right)
+                add(entry.left, l.value);
+        }
+    }
+
+    public void add(SortedMap<Long, LongSet> data)
+    {
+        for (Map.Entry<Long, LongSet> newEntry : data.entrySet())
+        {
+            LongSet found = tokens.get(newEntry.getKey());
+            if (found == null)
+                tokens.put(newEntry.getKey(), (found = new LongOpenHashSet(4)));
+
+            for (LongCursor offset : newEntry.getValue())
+                found.add(offset.value);
+        }
+    }
+
+    public Iterator<Pair<Long, LongSet>> iterator()
+    {
+        final Iterator<Map.Entry<Long, LongSet>> iterator = tokens.entrySet().iterator();
+        return new AbstractIterator<Pair<Long, LongSet>>()
+        {
+            protected Pair<Long, LongSet> computeNext()
+            {
+                if (!iterator.hasNext())
+                    return endOfData();
+
+                Map.Entry<Long, LongSet> entry = iterator.next();
+                return Pair.create(entry.getKey(), entry.getValue());
+            }
+        };
+    }
+
+    public boolean isEmpty()
+    {
+        return tokens.size() == 0;
+    }
+
+    protected void constructTree()
+    {
+        tokenCount = tokens.size();
+        treeMinToken = tokens.firstKey();
+        treeMaxToken = tokens.lastKey();
+        numBlocks = 1;
+
+        // special case the tree that only has a single block in it (so we don't create a useless root)
+        if (tokenCount <= TOKENS_PER_BLOCK)
+        {
+            leftmostLeaf = new DynamicLeaf(tokens);
+            rightmostLeaf = leftmostLeaf;
+            root = leftmostLeaf;
+        }
+        else
+        {
+            root = new InteriorNode();
+            rightmostParent = (InteriorNode) root;
+
+            int i = 0;
+            Leaf lastLeaf = null;
+            Long firstToken = tokens.firstKey();
+            Long finalToken = tokens.lastKey();
+            Long lastToken;
+            for (Long token : tokens.keySet())
+            {
+                if (i == 0 || (i % TOKENS_PER_BLOCK != 0 && i != (tokenCount - 1)))
+                {
+                    i++;
+                    continue;
+                }
+
+                lastToken = token;
+                Leaf leaf = (i != (tokenCount - 1) || token.equals(finalToken)) ?
+                        new DynamicLeaf(tokens.subMap(firstToken, lastToken)) : new DynamicLeaf(tokens.tailMap(firstToken));
+
+                if (i == TOKENS_PER_BLOCK)
+                    leftmostLeaf = leaf;
+                else
+                    lastLeaf.next = leaf;
+
+                rightmostParent.add(leaf);
+                lastLeaf = leaf;
+                rightmostLeaf = leaf;
+                firstToken = lastToken;
+                i++;
+                numBlocks++;
+
+                if (token.equals(finalToken))
+                {
+                    Leaf finalLeaf = new DynamicLeaf(tokens.tailMap(token));
+                    lastLeaf.next = finalLeaf;
+                    rightmostParent.add(finalLeaf);
+                    rightmostLeaf = finalLeaf;
+                    numBlocks++;
+                }
+            }
+
+        }
+    }
+
+    private class DynamicLeaf extends Leaf
+    {
+        private final SortedMap<Long, LongSet> tokens;
+
+        DynamicLeaf(SortedMap<Long, LongSet> data)
+        {
+            super(data.firstKey(), data.lastKey());
+            tokens = data;
+        }
+
+        public int tokenCount()
+        {
+            return tokens.size();
+        }
+
+        public boolean isSerializable()
+        {
+            return true;
+        }
+
+        protected void serializeData(ByteBuffer buf)
+        {
+            for (Map.Entry<Long, LongSet> entry : tokens.entrySet())
+                createEntry(entry.getKey(), entry.getValue()).serialize(buf);
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c4d5c73/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java
index 04b7b1c..c14f76c 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java
@@ -162,7 +162,7 @@ public class OnDiskIndexBuilder
         TokenTreeBuilder tokens = terms.get(term);
         if (tokens == null)
         {
-            terms.put(term, (tokens = new TokenTreeBuilder()));
+            terms.put(term, (tokens = new DynamicTokenTreeBuilder()));
 
             // on-heap size estimates from jol
             // 64 bytes for TTB + 48 bytes for TreeMap in TTB + size bytes for the term (map key)
@@ -269,8 +269,8 @@ public class OnDiskIndexBuilder
 
             out.skipBytes((int) (BLOCK_SIZE - out.position()));
 
-            dataLevel = mode == Mode.SPARSE ? new DataBuilderLevel(out, new MutableDataBlock(mode))
-                                            : new MutableLevel<>(out, new MutableDataBlock(mode));
+            dataLevel = mode == Mode.SPARSE ? new DataBuilderLevel(out, new MutableDataBlock(termComparator, mode))
+                                            : new MutableLevel<>(out, new MutableDataBlock(termComparator, mode));
             while (terms.hasNext())
             {
                 Pair<ByteBuffer, TokenTreeBuilder> term = terms.next();
@@ -454,7 +454,7 @@ public class OnDiskIndexBuilder
         public DataBuilderLevel(SequentialWriter out, MutableBlock<InMemoryDataTerm> block)
         {
             super(out, block);
-            superBlockTree = new TokenTreeBuilder();
+            superBlockTree = new DynamicTokenTreeBuilder();
         }
 
         public InMemoryPointerTerm add(InMemoryDataTerm term) throws IOException
@@ -465,20 +465,20 @@ public class OnDiskIndexBuilder
                 dataBlocksCnt++;
                 flushSuperBlock(false);
             }
-            superBlockTree.add(term.keys.getTokens());
+            superBlockTree.add(term.keys);
             return ptr;
         }
 
         public void flushSuperBlock(boolean force) throws IOException
         {
-            if (dataBlocksCnt == SUPER_BLOCK_SIZE || (force && !superBlockTree.getTokens().isEmpty()))
+            if (dataBlocksCnt == SUPER_BLOCK_SIZE || (force && !superBlockTree.isEmpty()))
             {
                 superBlockOffsets.add(out.position());
                 superBlockTree.finish().write(out);
                 alignToBlock(out);
 
                 dataBlocksCnt = 0;
-                superBlockTree = new TokenTreeBuilder();
+                superBlockTree = new DynamicTokenTreeBuilder();
             }
         }
 
@@ -549,28 +549,34 @@ public class OnDiskIndexBuilder
 
     private static class MutableDataBlock extends MutableBlock<InMemoryDataTerm>
     {
+        private static final int MAX_KEYS_SPARSE = 5;
+
+        private final AbstractType<?> comparator;
         private final Mode mode;
 
         private int offset = 0;
-        private int sparseValueTerms = 0;
 
         private final List<TokenTreeBuilder> containers = new ArrayList<>();
         private TokenTreeBuilder combinedIndex;
 
-        public MutableDataBlock(Mode mode)
+        public MutableDataBlock(AbstractType<?> comparator, Mode mode)
         {
+            this.comparator = comparator;
             this.mode = mode;
-            this.combinedIndex = new TokenTreeBuilder();
+            this.combinedIndex = initCombinedIndex();
         }
 
         protected void addInternal(InMemoryDataTerm term) throws IOException
         {
             TokenTreeBuilder keys = term.keys;
 
-            if (mode == Mode.SPARSE && keys.getTokenCount() <= 5)
+            if (mode == Mode.SPARSE)
             {
+                if (keys.getTokenCount() > MAX_KEYS_SPARSE)
+                    throw new IOException(String.format("Term - '%s' belongs to more than %d keys in %s mode, which is not allowed.",
+                                                        comparator.getString(term.term), MAX_KEYS_SPARSE, mode.name()));
+
                 writeTerm(term, keys);
-                sparseValueTerms++;
             }
             else
             {
@@ -581,7 +587,7 @@ public class OnDiskIndexBuilder
             }
 
             if (mode == Mode.SPARSE)
-                combinedIndex.add(keys.getTokens());
+                combinedIndex.add(keys);
         }
 
         protected int sizeAfter(InMemoryDataTerm element)
@@ -593,7 +599,7 @@ public class OnDiskIndexBuilder
         {
             super.flushAndClear(out);
 
-            out.writeInt((sparseValueTerms == 0) ? -1 : offset);
+            out.writeInt(mode == Mode.SPARSE ? offset : -1);
 
             if (containers.size() > 0)
             {
@@ -601,18 +607,15 @@ public class OnDiskIndexBuilder
                     tokens.write(out);
             }
 
-            if (sparseValueTerms > 0)
-            {
+            if (mode == Mode.SPARSE && combinedIndex != null)
                 combinedIndex.finish().write(out);
-            }
 
             alignToBlock(out);
 
             containers.clear();
-            combinedIndex = new TokenTreeBuilder();
+            combinedIndex = initCombinedIndex();
 
             offset = 0;
-            sparseValueTerms = 0;
         }
 
         private int ptrLength(InMemoryDataTerm term)
@@ -626,10 +629,8 @@ public class OnDiskIndexBuilder
         {
             term.serialize(buffer);
             buffer.writeByte((byte) keys.getTokenCount());
-
-            Iterator<Pair<Long, LongSet>> tokens = keys.iterator();
-            while (tokens.hasNext())
-                buffer.writeLong(tokens.next().left);
+            for (Pair<Long, LongSet> key : keys)
+                buffer.writeLong(key.left);
         }
 
         private void writeTerm(InMemoryTerm term, int offset) throws IOException
@@ -638,5 +639,10 @@ public class OnDiskIndexBuilder
             buffer.writeByte(0x0);
             buffer.writeInt(offset);
         }
+
+        private TokenTreeBuilder initCombinedIndex()
+        {
+            return mode == Mode.SPARSE ? new DynamicTokenTreeBuilder() : null;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c4d5c73/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java b/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java
index 6e63c71..34737ae 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java
@@ -37,6 +37,7 @@ import org.apache.cassandra.index.sasi.conf.ColumnIndex;
 import org.apache.cassandra.index.sasi.utils.CombinedTermIterator;
 import org.apache.cassandra.index.sasi.utils.TypeUtil;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.io.FSError;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.format.SSTableFlushObserver;
 import org.apache.cassandra.io.util.FileUtils;
@@ -126,7 +127,7 @@ public class PerSSTableIndexWriter implements SSTableFlushObserver
 
             Index index = indexes.get(column);
             if (index == null)
-                indexes.put(column, (index = new Index(columnIndex)));
+                indexes.put(column, (index = newIndex(columnIndex)));
 
             index.add(value.duplicate(), currentKey, currentKeyPosition);
         });
@@ -165,10 +166,18 @@ public class PerSSTableIndexWriter implements SSTableFlushObserver
     }
 
     @VisibleForTesting
+    protected Index newIndex(ColumnIndex columnIndex)
+    {
+        return new Index(columnIndex);
+    }
+
+    @VisibleForTesting
     protected class Index
     {
+        @VisibleForTesting
+        protected final String outputFile;
+
         private final ColumnIndex columnIndex;
-        private final String outputFile;
         private final AbstractAnalyzer analyzer;
         private final long maxMemorySize;
 
@@ -245,17 +254,22 @@ public class PerSSTableIndexWriter implements SSTableFlushObserver
             final String segmentFile = filename(isFinal);
 
             return () -> {
-                long start1 = System.nanoTime();
+                long start = System.nanoTime();
 
                 try
                 {
                     File index = new File(segmentFile);
                     return builder.finish(index) ? new OnDiskIndex(index, columnIndex.getValidator(), null) : null;
                 }
+                catch (Exception | FSError e)
+                {
+                    logger.error("Failed to build index segment {}", segmentFile, e);
+                    return null;
+                }
                 finally
                 {
                     if (!isFinal)
-                        logger.info("Flushed index segment {}, took {} ms.", segmentFile, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start1));
+                        logger.info("Flushed index segment {}, took {} ms.", segmentFile, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
                 }
             };
         }
@@ -290,7 +304,7 @@ public class PerSSTableIndexWriter implements SSTableFlushObserver
 
                     for (Future<OnDiskIndex> f : segments)
                     {
-                        OnDiskIndex part = Futures.getUnchecked(f);
+                        OnDiskIndex part = f.get();
                         if (part == null)
                             continue;
 
@@ -304,7 +318,7 @@ public class PerSSTableIndexWriter implements SSTableFlushObserver
                                    new File(outputFile),
                                    new CombinedTermIterator(parts));
                 }
-                catch (Exception e)
+                catch (Exception | FSError e)
                 {
                     logger.error("Failed to flush index {}.", outputFile, e);
                     FileUtils.delete(outputFile);
@@ -313,13 +327,14 @@ public class PerSSTableIndexWriter implements SSTableFlushObserver
                 {
                     logger.info("Index flush to {} took {} ms.", outputFile, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start1));
 
-                    for (OnDiskIndex part : parts)
+                    for (int segment = 0; segment < segmentNumber; segment++)
                     {
-                        if (part == null)
-                            continue;
+                        OnDiskIndex part = parts[segment];
+
+                        if (part != null)
+                            FileUtils.closeQuietly(part);
 
-                        FileUtils.closeQuietly(part);
-                        FileUtils.delete(part.getIndexPath());
+                        FileUtils.delete(outputFile + "_" + segment);
                     }
 
                     latch.countDown();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c4d5c73/src/java/org/apache/cassandra/index/sasi/disk/StaticTokenTreeBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/StaticTokenTreeBuilder.java b/src/java/org/apache/cassandra/index/sasi/disk/StaticTokenTreeBuilder.java
new file mode 100644
index 0000000..147427e
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/disk/StaticTokenTreeBuilder.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.disk;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.SortedMap;
+
+import org.apache.cassandra.index.sasi.utils.CombinedTerm;
+import org.apache.cassandra.index.sasi.utils.RangeIterator;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.AbstractIterator;
+import org.apache.cassandra.utils.Pair;
+
+import com.carrotsearch.hppc.LongSet;
+import com.google.common.collect.Iterators;
+
+/**
+ * Intended usage of this class is to be used in place of {@link DynamicTokenTreeBuilder}
+ * when multiple index segments produced by {@link PerSSTableIndexWriter} are stitched together
+ * by {@link PerSSTableIndexWriter#complete()}.
+ *
+ * This class uses the RangeIterator, now provided by
+ * {@link CombinedTerm#getTokenIterator()}, to iterate the data twice.
+ * The first iteration builds the tree with leaves that contain only enough
+ * information to build the upper layers -- these leaves do not store more
+ * than their minimum and maximum tokens plus their total size, which makes them
+ * un-serializable.
+ *
+ * When the tree is written to disk the final layer is not
+ * written. Its at this point the data is iterated once again to write
+ * the leaves to disk. This (logarithmically) reduces copying of the
+ * token values while building and writing upper layers of the tree,
+ * removes the use of SortedMap when combining SAs, and relies on the
+ * memory mapped SAs otherwise, greatly improving performance and no
+ * longer causing OOMs when TokenTree sizes are big.
+ *
+ * See https://issues.apache.org/jira/browse/CASSANDRA-11383 for more details.
+ */
+public class StaticTokenTreeBuilder extends AbstractTokenTreeBuilder
+{
+    private final CombinedTerm combinedTerm;
+
+    public StaticTokenTreeBuilder(CombinedTerm term)
+    {
+        combinedTerm = term;
+    }
+
+    public void add(Long token, long keyPosition)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public void add(SortedMap<Long, LongSet> data)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public void add(Iterator<Pair<Long, LongSet>> data)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public boolean isEmpty()
+    {
+        return combinedTerm.getTokenIterator().getCount() == 0;
+    }
+
+    public Iterator<Pair<Long, LongSet>> iterator()
+    {
+        Iterator<Token> iterator = combinedTerm.getTokenIterator();
+        return new AbstractIterator<Pair<Long, LongSet>>()
+        {
+            protected Pair<Long, LongSet> computeNext()
+            {
+                if (!iterator.hasNext())
+                    return endOfData();
+
+                Token token = iterator.next();
+                return Pair.create(token.get(), token.getOffsets());
+            }
+        };
+    }
+
+    public long getTokenCount()
+    {
+        return combinedTerm.getTokenIterator().getCount();
+    }
+
+    @Override
+    public void write(DataOutputPlus out) throws IOException
+    {
+        // if the root is not a leaf then none of the leaves have been written (all are PartialLeaf)
+        // so write out the last layer of the tree by converting PartialLeaf to StaticLeaf and
+        // iterating the data once more
+        super.write(out);
+        if (root.isLeaf())
+            return;
+
+        RangeIterator<Long, Token> tokens = combinedTerm.getTokenIterator();
+        ByteBuffer blockBuffer = ByteBuffer.allocate(BLOCK_BYTES);
+        Iterator<Node> leafIterator = leftmostLeaf.levelIterator();
+        while (leafIterator.hasNext())
+        {
+            Leaf leaf = (Leaf) leafIterator.next();
+            Leaf writeableLeaf = new StaticLeaf(Iterators.limit(tokens, leaf.tokenCount()), leaf);
+            writeableLeaf.serialize(-1, blockBuffer);
+            flushBuffer(blockBuffer, out, true);
+        }
+
+    }
+
+    protected void constructTree()
+    {
+        RangeIterator<Long, Token> tokens = combinedTerm.getTokenIterator();
+
+        tokenCount = tokens.getCount();
+        treeMinToken = tokens.getMinimum();
+        treeMaxToken = tokens.getMaximum();
+        numBlocks = 1;
+
+        if (tokenCount <= TOKENS_PER_BLOCK)
+        {
+            leftmostLeaf = new StaticLeaf(tokens, tokens.getMinimum(), tokens.getMaximum(), tokens.getCount(), true);
+            rightmostLeaf = leftmostLeaf;
+            root = leftmostLeaf;
+        }
+        else
+        {
+            root = new InteriorNode();
+            rightmostParent = (InteriorNode) root;
+
+            // build all the leaves except for maybe
+            // the last leaf which is not completely full .
+            // This loop relies on the fact that multiple index segments
+            // will never have token intersection for a single term,
+            // because it's impossible to encounter the same value for
+            // the same column multiple times in a single key/sstable.
+            Leaf lastLeaf = null;
+            long numFullLeaves = tokenCount / TOKENS_PER_BLOCK;
+            for (long i = 0; i < numFullLeaves; i++)
+            {
+                Long firstToken = tokens.next().get();
+                for (int j = 1; j < (TOKENS_PER_BLOCK - 1); j++)
+                    tokens.next();
+
+                Long lastToken = tokens.next().get();
+                Leaf leaf = new PartialLeaf(firstToken, lastToken, TOKENS_PER_BLOCK);
+
+                if (lastLeaf == null)
+                    leftmostLeaf = leaf;
+                else
+                    lastLeaf.next = leaf;
+
+                rightmostParent.add(leaf);
+                lastLeaf = rightmostLeaf = leaf;
+                numBlocks++;
+            }
+
+            // build the last leaf out of any remaining tokens if necessary
+            // safe downcast since TOKENS_PER_BLOCK is an int
+            int remainingTokens = (int) (tokenCount % TOKENS_PER_BLOCK);
+            if (remainingTokens != 0)
+            {
+                Long firstToken = tokens.next().get();
+                Long lastToken = firstToken;
+                while (tokens.hasNext())
+                    lastToken = tokens.next().get();
+
+                Leaf leaf = new PartialLeaf(firstToken, lastToken, remainingTokens);
+                rightmostParent.add(leaf);
+                lastLeaf.next = rightmostLeaf = leaf;
+                numBlocks++;
+            }
+        }
+    }
+
+    // This denotes the leaf which only has min/max and token counts
+    // but doesn't have any associated data yet, so it can't be serialized.
+    private class PartialLeaf extends Leaf
+    {
+        private final int size;
+        public PartialLeaf(Long min, Long max, int count)
+        {
+            super(min, max);
+            size = count;
+        }
+
+        public int tokenCount()
+        {
+            return size;
+        }
+
+        public void serializeData(ByteBuffer buf)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public boolean isSerializable()
+        {
+            return false;
+        }
+    }
+
+    // This denotes the leaf which has been filled with data and is ready to be serialized
+    private class StaticLeaf extends Leaf
+    {
+        private final Iterator<Token> tokens;
+        private final int count;
+        private final boolean isLast;
+
+        public StaticLeaf(Iterator<Token> tokens, Leaf leaf)
+        {
+            this(tokens, leaf.smallestToken(), leaf.largestToken(), leaf.tokenCount(), leaf.isLastLeaf());
+        }
+
+        public StaticLeaf(Iterator<Token> tokens, Long min, Long max, long count, boolean isLastLeaf)
+        {
+            super(min, max);
+
+            this.count = (int) count; // downcast is safe since leaf size is always < Integer.MAX_VALUE
+            this.tokens = tokens;
+            this.isLast = isLastLeaf;
+        }
+
+        public boolean isLastLeaf()
+        {
+            return isLast;
+        }
+
+        public int tokenCount()
+        {
+            return count;
+        }
+
+        public void serializeData(ByteBuffer buf)
+        {
+            while (tokens.hasNext())
+            {
+                Token entry = tokens.next();
+                createEntry(entry.get(), entry.getOffsets()).serialize(buf);
+            }
+        }
+
+        public boolean isSerializable()
+        {
+            return true;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c4d5c73/src/java/org/apache/cassandra/index/sasi/disk/Token.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/Token.java b/src/java/org/apache/cassandra/index/sasi/disk/Token.java
index 02130a3..4cd1ea3 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/Token.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/Token.java
@@ -18,9 +18,12 @@
 package org.apache.cassandra.index.sasi.disk;
 
 import com.google.common.primitives.Longs;
+
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.index.sasi.utils.CombinedValue;
 
+import com.carrotsearch.hppc.LongSet;
+
 public abstract class Token implements CombinedValue<Long>, Iterable<DecoratedKey>
 {
     protected final long token;
@@ -35,6 +38,8 @@ public abstract class Token implements CombinedValue<Long>, Iterable<DecoratedKe
         return token;
     }
 
+    public abstract LongSet getOffsets();
+
     public int compareTo(CombinedValue<Long> o)
     {
         return Longs.compare(token, ((Token) o).token);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c4d5c73/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java b/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java
index 5d85d00..3f8182d 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java
@@ -27,6 +27,8 @@ import org.apache.cassandra.index.sasi.utils.MappedBuffer;
 import org.apache.cassandra.index.sasi.utils.RangeIterator;
 import org.apache.cassandra.utils.MergeIterator;
 
+import com.carrotsearch.hppc.LongOpenHashSet;
+import com.carrotsearch.hppc.LongSet;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.collect.Iterators;
@@ -406,9 +408,9 @@ public class TokenTree
             });
         }
 
-        public Set<Long> getOffsets()
+        public LongSet getOffsets()
         {
-            Set<Long> offsets = new HashSet<>();
+            LongSet offsets = new LongOpenHashSet(4);
             for (TokenInfo i : info)
             {
                 for (long offset : i.fetchOffsets())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c4d5c73/src/java/org/apache/cassandra/index/sasi/disk/TokenTreeBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/TokenTreeBuilder.java b/src/java/org/apache/cassandra/index/sasi/disk/TokenTreeBuilder.java
index e10b057..2210964 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/TokenTreeBuilder.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/TokenTreeBuilder.java
@@ -18,21 +18,26 @@
 package org.apache.cassandra.index.sasi.disk;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.*;
 
 import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 
-import com.carrotsearch.hppc.LongArrayList;
 import com.carrotsearch.hppc.LongSet;
-import com.carrotsearch.hppc.cursors.LongCursor;
-import com.carrotsearch.hppc.LongOpenHashSet;
-import com.google.common.collect.AbstractIterator;
 
-public class TokenTreeBuilder
+public interface TokenTreeBuilder extends Iterable<Pair<Long, LongSet>>
 {
+    int BLOCK_BYTES = 4096;
+    int BLOCK_HEADER_BYTES = 64;
+    int OVERFLOW_TRAILER_BYTES = 64;
+    int OVERFLOW_TRAILER_CAPACITY = OVERFLOW_TRAILER_BYTES / 8;
+    int TOKENS_PER_BLOCK = (BLOCK_BYTES - BLOCK_HEADER_BYTES - OVERFLOW_TRAILER_BYTES) / 16;
+    long MAX_OFFSET = (1L << 47) - 1; // 48 bits for (signed) offset
+    byte LAST_LEAF_SHIFT = 1;
+    byte SHARED_HEADER_BYTES = 19;
+    byte ENTRY_TYPE_MASK = 0x03;
+    short AB_MAGIC = 0x5A51;
+
     // note: ordinal positions are used here, do not change order
     enum EntryType
     {
@@ -56,784 +61,16 @@ public class TokenTreeBuilder
         }
     }
 
-    public static final int BLOCK_BYTES = 4096;
-    public static final int BLOCK_HEADER_BYTES = 64;
-    public static final int OVERFLOW_TRAILER_BYTES = 64;
-    public static final int OVERFLOW_TRAILER_CAPACITY = OVERFLOW_TRAILER_BYTES / 8;
-    public static final int TOKENS_PER_BLOCK = (BLOCK_BYTES - BLOCK_HEADER_BYTES - OVERFLOW_TRAILER_BYTES) / 16;
-    public static final long MAX_OFFSET = (1L << 47) - 1; // 48 bits for (signed) offset
-    public static final byte LAST_LEAF_SHIFT = 1;
-    public static final byte SHARED_HEADER_BYTES = 19;
-    public static final byte ENTRY_TYPE_MASK = 0x03;
-    public static final short AB_MAGIC = 0x5A51;
-
-    private final SortedMap<Long, LongSet> tokens = new TreeMap<>();
-    private int numBlocks;
-
-    private Node root;
-    private InteriorNode rightmostParent;
-    private Leaf leftmostLeaf;
-    private Leaf rightmostLeaf;
-    private long tokenCount = 0;
-    private long treeMinToken;
-    private long treeMaxToken;
-
-    public TokenTreeBuilder()
-    {}
-
-    public TokenTreeBuilder(SortedMap<Long, LongSet> data)
-    {
-        add(data);
-    }
-
-    public void add(Long token, long keyPosition)
-    {
-        LongSet found = tokens.get(token);
-        if (found == null)
-            tokens.put(token, (found = new LongOpenHashSet(2)));
-
-        found.add(keyPosition);
-    }
-
-    public void add(SortedMap<Long, LongSet> data)
-    {
-        for (Map.Entry<Long, LongSet> newEntry : data.entrySet())
-        {
-            LongSet found = tokens.get(newEntry.getKey());
-            if (found == null)
-                tokens.put(newEntry.getKey(), (found = new LongOpenHashSet(4)));
-
-            for (LongCursor offset : newEntry.getValue())
-                found.add(offset.value);
-        }
-    }
-
-    public TokenTreeBuilder finish()
-    {
-        maybeBulkLoad();
-        return this;
-    }
-
-    public SortedMap<Long, LongSet> getTokens()
-    {
-        return tokens;
-    }
-
-    public long getTokenCount()
-    {
-        return tokenCount;
-    }
-
-    public int serializedSize()
-    {
-        if (numBlocks == 1)
-            return (BLOCK_HEADER_BYTES + ((int) tokenCount * 16));
-        else
-            return numBlocks * BLOCK_BYTES;
-    }
-
-    public void write(DataOutputPlus out) throws IOException
-    {
-        ByteBuffer blockBuffer = ByteBuffer.allocate(BLOCK_BYTES);
-        Iterator<Node> levelIterator = root.levelIterator();
-        long childBlockIndex = 1;
-
-        while (levelIterator != null)
-        {
-
-            Node firstChild = null;
-            while (levelIterator.hasNext())
-            {
-                Node block = levelIterator.next();
-
-                if (firstChild == null && !block.isLeaf())
-                    firstChild = ((InteriorNode) block).children.get(0);
-
-                block.serialize(childBlockIndex, blockBuffer);
-                flushBuffer(blockBuffer, out, numBlocks != 1);
-
-                childBlockIndex += block.childCount();
-            }
-
-            levelIterator = (firstChild == null) ? null : firstChild.levelIterator();
-        }
-    }
-
-    public Iterator<Pair<Long, LongSet>> iterator()
-    {
-        return new TokenIterator(leftmostLeaf.levelIterator());
-    }
-
-    private void maybeBulkLoad()
-    {
-        if (root == null)
-            bulkLoad();
-    }
-
-    private void flushBuffer(ByteBuffer buffer, DataOutputPlus o, boolean align) throws IOException
-    {
-        // seek to end of last block before flushing
-        if (align)
-            alignBuffer(buffer, BLOCK_BYTES);
-
-        buffer.flip();
-        o.write(buffer);
-        buffer.clear();
-    }
-
-    private static void alignBuffer(ByteBuffer buffer, int blockSize)
-    {
-        long curPos = buffer.position();
-        if ((curPos & (blockSize - 1)) != 0) // align on the block boundary if needed
-            buffer.position((int) FBUtilities.align(curPos, blockSize));
-    }
-
-    private void bulkLoad()
-    {
-        tokenCount = tokens.size();
-        treeMinToken = tokens.firstKey();
-        treeMaxToken = tokens.lastKey();
-        numBlocks = 1;
-
-        // special case the tree that only has a single block in it (so we don't create a useless root)
-        if (tokenCount <= TOKENS_PER_BLOCK)
-        {
-            leftmostLeaf = new Leaf(tokens);
-            rightmostLeaf = leftmostLeaf;
-            root = leftmostLeaf;
-        }
-        else
-        {
-            root = new InteriorNode();
-            rightmostParent = (InteriorNode) root;
-
-            int i = 0;
-            Leaf lastLeaf = null;
-            Long firstToken = tokens.firstKey();
-            Long finalToken = tokens.lastKey();
-            Long lastToken;
-            for (Long token : tokens.keySet())
-            {
-                if (i == 0 || (i % TOKENS_PER_BLOCK != 0 && i != (tokenCount - 1)))
-                {
-                    i++;
-                    continue;
-                }
-
-                lastToken = token;
-                Leaf leaf = (i != (tokenCount - 1) || token.equals(finalToken)) ?
-                        new Leaf(tokens.subMap(firstToken, lastToken)) : new Leaf(tokens.tailMap(firstToken));
-
-                if (i == TOKENS_PER_BLOCK)
-                    leftmostLeaf = leaf;
-                else
-                    lastLeaf.next = leaf;
-
-                rightmostParent.add(leaf);
-                lastLeaf = leaf;
-                rightmostLeaf = leaf;
-                firstToken = lastToken;
-                i++;
-                numBlocks++;
-
-                if (token.equals(finalToken))
-                {
-                    Leaf finalLeaf = new Leaf(tokens.tailMap(token));
-                    lastLeaf.next = finalLeaf;
-                    rightmostParent.add(finalLeaf);
-                    rightmostLeaf = finalLeaf;
-                    numBlocks++;
-                }
-            }
-
-        }
-    }
-
-    private abstract class Node
-    {
-        protected InteriorNode parent;
-        protected Node next;
-        protected Long nodeMinToken, nodeMaxToken;
-
-        public abstract void serialize(long childBlockIndex, ByteBuffer buf);
-        public abstract int childCount();
-        public abstract int tokenCount();
-        public abstract Long smallestToken();
-
-        public Iterator<Node> levelIterator()
-        {
-            return new LevelIterator(this);
-        }
-
-        public boolean isLeaf()
-        {
-            return (this instanceof Leaf);
-        }
-
-        protected boolean isLastLeaf()
-        {
-            return this == rightmostLeaf;
-        }
-
-        protected boolean isRoot()
-        {
-            return this == root;
-        }
-
-        protected void updateTokenRange(long token)
-        {
-            nodeMinToken = nodeMinToken == null ? token : Math.min(nodeMinToken, token);
-            nodeMaxToken = nodeMaxToken == null ? token : Math.max(nodeMaxToken, token);
-        }
-
-        protected void serializeHeader(ByteBuffer buf)
-        {
-            Header header;
-            if (isRoot())
-                header = new RootHeader();
-            else if (!isLeaf())
-                header = new InteriorNodeHeader();
-            else
-                header = new LeafHeader();
-
-            header.serialize(buf);
-            alignBuffer(buf, BLOCK_HEADER_BYTES);
-        }
-
-        private abstract class Header
-        {
-            public void serialize(ByteBuffer buf)
-            {
-                buf.put(infoByte())
-                        .putShort((short) (tokenCount()))
-                        .putLong(nodeMinToken)
-                        .putLong(nodeMaxToken);
-            }
-
-            protected abstract byte infoByte();
-        }
-
-        private class RootHeader extends Header
-        {
-            public void serialize(ByteBuffer buf)
-            {
-                super.serialize(buf);
-                writeMagic(buf);
-                buf.putLong(tokenCount)
-                        .putLong(treeMinToken)
-                        .putLong(treeMaxToken);
-            }
-
-            protected byte infoByte()
-            {
-                // if leaf, set leaf indicator and last leaf indicator (bits 0 & 1)
-                // if not leaf, clear both bits
-                return (byte) ((isLeaf()) ? 3 : 0);
-            }
-
-            protected void writeMagic(ByteBuffer buf)
-            {
-                switch (Descriptor.CURRENT_VERSION)
-                {
-                    case Descriptor.VERSION_AB:
-                        buf.putShort(AB_MAGIC);
-                        break;
-                    default:
-                        break;
-                }
-
-            }
-        }
-
-        private class InteriorNodeHeader extends Header
-        {
-            // bit 0 (leaf indicator) & bit 1 (last leaf indicator) cleared
-            protected byte infoByte()
-            {
-                return 0;
-            }
-        }
-
-        private class LeafHeader extends Header
-        {
-            // bit 0 set as leaf indicator
-            // bit 1 set if this is last leaf of data
-            protected byte infoByte()
-            {
-                byte infoByte = 1;
-                infoByte |= (isLastLeaf()) ? (1 << LAST_LEAF_SHIFT) : 0;
-
-                return infoByte;
-            }
-        }
-
-    }
-
-    private class Leaf extends Node
-    {
-        private final SortedMap<Long, LongSet> tokens;
-        private LongArrayList overflowCollisions;
-
-        Leaf(SortedMap<Long, LongSet> data)
-        {
-            nodeMinToken = data.firstKey();
-            nodeMaxToken = data.lastKey();
-            tokens = data;
-        }
-
-        public Long largestToken()
-        {
-            return nodeMaxToken;
-        }
-
-        public void serialize(long childBlockIndex, ByteBuffer buf)
-        {
-            serializeHeader(buf);
-            serializeData(buf);
-            serializeOverflowCollisions(buf);
-        }
-
-        public int childCount()
-        {
-            return 0;
-        }
-
-        public int tokenCount()
-        {
-            return tokens.size();
-        }
-
-        public Long smallestToken()
-        {
-            return nodeMinToken;
-        }
-
-        public Iterator<Map.Entry<Long, LongSet>> tokenIterator()
-        {
-            return tokens.entrySet().iterator();
-        }
-
-        private void serializeData(ByteBuffer buf)
-        {
-            for (Map.Entry<Long, LongSet> entry : tokens.entrySet())
-                createEntry(entry.getKey(), entry.getValue()).serialize(buf);
-        }
-
-        private void serializeOverflowCollisions(ByteBuffer buf)
-        {
-            if (overflowCollisions != null)
-                for (LongCursor offset : overflowCollisions)
-                    buf.putLong(offset.value);
-        }
-
-
-        private LeafEntry createEntry(final long tok, final LongSet offsets)
-        {
-            int offsetCount = offsets.size();
-            switch (offsetCount)
-            {
-                case 0:
-                    throw new AssertionError("no offsets for token " + tok);
-                case 1:
-                    long offset = offsets.toArray()[0];
-                    if (offset > MAX_OFFSET)
-                        throw new AssertionError("offset " + offset + " cannot be greater than " + MAX_OFFSET);
-                    else if (offset <= Integer.MAX_VALUE)
-                        return new SimpleLeafEntry(tok, offset);
-                    else
-                        return new FactoredOffsetLeafEntry(tok, offset);
-                case 2:
-                    long[] rawOffsets = offsets.toArray();
-                    if (rawOffsets[0] <= Integer.MAX_VALUE && rawOffsets[1] <= Integer.MAX_VALUE &&
-                            (rawOffsets[0] <= Short.MAX_VALUE || rawOffsets[1] <= Short.MAX_VALUE))
-                        return new PackedCollisionLeafEntry(tok, rawOffsets);
-                    else
-                        return createOverflowEntry(tok, offsetCount, offsets);
-                default:
-                    return createOverflowEntry(tok, offsetCount, offsets);
-            }
-        }
-
-        private LeafEntry createOverflowEntry(final long tok, final int offsetCount, final LongSet offsets)
-        {
-            if (overflowCollisions == null)
-                overflowCollisions = new LongArrayList();
-
-            LeafEntry entry = new OverflowCollisionLeafEntry(tok, (short) overflowCollisions.size(), (short) offsetCount);
-            for (LongCursor o : offsets) {
-                if (overflowCollisions.size() == OVERFLOW_TRAILER_CAPACITY)
-                    throw new AssertionError("cannot have more than " + OVERFLOW_TRAILER_CAPACITY + " overflow collisions per leaf");
-                else
-                    overflowCollisions.add(o.value);
-            }
-            return entry;
-        }
-
-        private abstract class LeafEntry
-        {
-            protected final long token;
-
-            abstract public EntryType type();
-            abstract public int offsetData();
-            abstract public short offsetExtra();
-
-            public LeafEntry(final long tok)
-            {
-                token = tok;
-            }
-
-            public void serialize(ByteBuffer buf)
-            {
-                buf.putShort((short) type().ordinal())
-                        .putShort(offsetExtra())
-                        .putLong(token)
-                        .putInt(offsetData());
-            }
-
-        }
-
-
-        // assumes there is a single offset and the offset is <= Integer.MAX_VALUE
-        private class SimpleLeafEntry extends LeafEntry
-        {
-            private final long offset;
-
-            public SimpleLeafEntry(final long tok, final long off)
-            {
-                super(tok);
-                offset = off;
-            }
-
-            public EntryType type()
-            {
-                return EntryType.SIMPLE;
-            }
-
-            public int offsetData()
-            {
-                return (int) offset;
-            }
-
-            public short offsetExtra()
-            {
-                return 0;
-            }
-        }
-
-        // assumes there is a single offset and Integer.MAX_VALUE < offset <= MAX_OFFSET
-        // take the middle 32 bits of offset (or the top 32 when considering offset is max 48 bits)
-        // and store where offset is normally stored. take bottom 16 bits of offset and store in entry header
-        private class FactoredOffsetLeafEntry extends LeafEntry
-        {
-            private final long offset;
-
-            public FactoredOffsetLeafEntry(final long tok, final long off)
-            {
-                super(tok);
-                offset = off;
-            }
-
-            public EntryType type()
-            {
-                return EntryType.FACTORED;
-            }
-
-            public int offsetData()
-            {
-                return (int) (offset >>> Short.SIZE);
-            }
-
-            public short offsetExtra()
-            {
-                return (short) offset;
-            }
-        }
-
-        // holds an entry with two offsets that can be packed in an int & a short
-        // the int offset is stored where offset is normally stored. short offset is
-        // stored in entry header
-        private class PackedCollisionLeafEntry extends LeafEntry
-        {
-            private short smallerOffset;
-            private int largerOffset;
-
-            public PackedCollisionLeafEntry(final long tok, final long[] offs)
-            {
-                super(tok);
-
-                smallerOffset = (short) Math.min(offs[0], offs[1]);
-                largerOffset = (int) Math.max(offs[0], offs[1]);
-            }
-
-            public EntryType type()
-            {
-                return EntryType.PACKED;
-            }
-
-            public int offsetData()
-            {
-                return largerOffset;
-            }
-
-            public short offsetExtra()
-            {
-                return smallerOffset;
-            }
-        }
-
-        // holds an entry with three or more offsets, or two offsets that cannot
-        // be packed into an int & a short. the index into the overflow list
-        // is stored where the offset is normally stored. the number of overflowed offsets
-        // for the entry is stored in the entry header
-        private class OverflowCollisionLeafEntry extends LeafEntry
-        {
-            private final short startIndex;
-            private final short count;
-
-            public OverflowCollisionLeafEntry(final long tok, final short collisionStartIndex, final short collisionCount)
-            {
-                super(tok);
-                startIndex = collisionStartIndex;
-                count = collisionCount;
-            }
-
-            public EntryType type()
-            {
-                return EntryType.OVERFLOW;
-            }
-
-            public int offsetData()
-            {
-                return startIndex;
-            }
-
-            public short offsetExtra()
-            {
-                return count;
-            }
-
-        }
-
-    }
-
-    private class InteriorNode extends Node
-    {
-        private List<Long> tokens = new ArrayList<>(TOKENS_PER_BLOCK);
-        private List<Node> children = new ArrayList<>(TOKENS_PER_BLOCK + 1);
-        private int position = 0; // TODO (jwest): can get rid of this and use array size
-
-
-        public void serialize(long childBlockIndex, ByteBuffer buf)
-        {
-            serializeHeader(buf);
-            serializeTokens(buf);
-            serializeChildOffsets(childBlockIndex, buf);
-        }
+    void add(Long token, long keyPosition);
+    void add(SortedMap<Long, LongSet> data);
+    void add(Iterator<Pair<Long, LongSet>> data);
+    void add(TokenTreeBuilder ttb);
 
-        public int childCount()
-        {
-            return children.size();
-        }
+    boolean isEmpty();
+    long getTokenCount();
 
-        public int tokenCount()
-        {
-            return tokens.size();
-        }
+    TokenTreeBuilder finish();
 
-        public Long smallestToken()
-        {
-            return tokens.get(0);
-        }
-
-        protected void add(Long token, InteriorNode leftChild, InteriorNode rightChild)
-        {
-            int pos = tokens.size();
-            if (pos == TOKENS_PER_BLOCK)
-            {
-                InteriorNode sibling = split();
-                sibling.add(token, leftChild, rightChild);
-
-            }
-            else {
-                if (leftChild != null)
-                    children.add(pos, leftChild);
-
-                if (rightChild != null)
-                {
-                    children.add(pos + 1, rightChild);
-                    rightChild.parent = this;
-                }
-
-                updateTokenRange(token);
-                tokens.add(pos, token);
-            }
-        }
-
-        protected void add(Leaf node)
-        {
-
-            if (position == (TOKENS_PER_BLOCK + 1))
-            {
-                rightmostParent = split();
-                rightmostParent.add(node);
-            }
-            else
-            {
-
-                node.parent = this;
-                children.add(position, node);
-                position++;
-
-                // the first child is referenced only during bulk load. we don't take a value
-                // to store into the tree, one is subtracted since position has already been incremented
-                // for the next node to be added
-                if (position - 1 == 0)
-                    return;
-
-
-                // tokens are inserted one behind the current position, but 2 is subtracted because
-                // position has already been incremented for the next add
-                Long smallestToken = node.smallestToken();
-                updateTokenRange(smallestToken);
-                tokens.add(position - 2, smallestToken);
-            }
-
-        }
-
-        protected InteriorNode split()
-        {
-            Pair<Long, InteriorNode> splitResult = splitBlock();
-            Long middleValue = splitResult.left;
-            InteriorNode sibling = splitResult.right;
-            InteriorNode leftChild = null;
-
-            // create a new root if necessary
-            if (parent == null)
-            {
-                parent = new InteriorNode();
-                root = parent;
-                sibling.parent = parent;
-                leftChild = this;
-                numBlocks++;
-            }
-
-            parent.add(middleValue, leftChild, sibling);
-
-            return sibling;
-        }
-
-        protected Pair<Long, InteriorNode> splitBlock()
-        {
-            final int splitPosition = TOKENS_PER_BLOCK - 2;
-            InteriorNode sibling = new InteriorNode();
-            sibling.parent = parent;
-            next = sibling;
-
-            Long middleValue = tokens.get(splitPosition);
-
-            for (int i = splitPosition; i < TOKENS_PER_BLOCK; i++)
-            {
-                if (i != TOKENS_PER_BLOCK && i != splitPosition)
-                {
-                    long token = tokens.get(i);
-                    sibling.updateTokenRange(token);
-                    sibling.tokens.add(token);
-                }
-
-                Node child = children.get(i + 1);
-                child.parent = sibling;
-                sibling.children.add(child);
-                sibling.position++;
-            }
-
-            for (int i = TOKENS_PER_BLOCK; i >= splitPosition; i--)
-            {
-                if (i != TOKENS_PER_BLOCK)
-                    tokens.remove(i);
-
-                if (i != splitPosition)
-                    children.remove(i);
-            }
-
-            nodeMinToken = smallestToken();
-            nodeMaxToken = tokens.get(tokens.size() - 1);
-            numBlocks++;
-
-            return Pair.create(middleValue, sibling);
-        }
-
-        protected boolean isFull()
-        {
-            return (position >= TOKENS_PER_BLOCK + 1);
-        }
-
-        private void serializeTokens(ByteBuffer buf)
-        {
-            for (Long token : tokens)
-                buf.putLong(token);
-        }
-
-
-        private void serializeChildOffsets(long childBlockIndex, ByteBuffer buf)
-        {
-            for (int i = 0; i < children.size(); i++)
-                buf.putLong((childBlockIndex + i) * BLOCK_BYTES);
-        }
-    }
-
-    public static class LevelIterator extends AbstractIterator<Node>
-    {
-        private Node currentNode;
-
-        LevelIterator(Node first)
-        {
-            currentNode = first;
-        }
-
-        public Node computeNext()
-        {
-            if (currentNode == null)
-                return endOfData();
-
-            Node returnNode = currentNode;
-            currentNode = returnNode.next;
-
-            return returnNode;
-        }
-
-
-    }
-
-    public static class TokenIterator extends AbstractIterator<Pair<Long, LongSet>>
-    {
-        private Iterator<Node> levelIterator;
-        private Iterator<Map.Entry<Long, LongSet>> currentIterator;
-
-        TokenIterator(Iterator<Node> level)
-        {
-            levelIterator = level;
-            if (levelIterator.hasNext())
-                currentIterator = ((Leaf) levelIterator.next()).tokenIterator();
-        }
-
-        public Pair<Long, LongSet> computeNext()
-        {
-            if (currentIterator != null && currentIterator.hasNext())
-            {
-                Map.Entry<Long, LongSet> next = currentIterator.next();
-                return Pair.create(next.getKey(), next.getValue());
-            }
-            else
-            {
-                if (!levelIterator.hasNext())
-                    return endOfData();
-                else
-                {
-                    currentIterator = ((Leaf) levelIterator.next()).tokenIterator();
-                    return computeNext();
-                }
-            }
-
-        }
-    }
+    int serializedSize();
+    void write(DataOutputPlus out) throws IOException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c4d5c73/src/java/org/apache/cassandra/index/sasi/memory/KeyRangeIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/memory/KeyRangeIterator.java b/src/java/org/apache/cassandra/index/sasi/memory/KeyRangeIterator.java
index 293e2ee..a2f2c0e 100644
--- a/src/java/org/apache/cassandra/index/sasi/memory/KeyRangeIterator.java
+++ b/src/java/org/apache/cassandra/index/sasi/memory/KeyRangeIterator.java
@@ -29,6 +29,8 @@ import org.apache.cassandra.index.sasi.utils.AbstractIterator;
 import org.apache.cassandra.index.sasi.utils.CombinedValue;
 import org.apache.cassandra.index.sasi.utils.RangeIterator;
 
+import com.carrotsearch.hppc.LongOpenHashSet;
+import com.carrotsearch.hppc.LongSet;
 import com.google.common.collect.PeekingIterator;
 
 public class KeyRangeIterator extends RangeIterator<Long, Token>
@@ -91,6 +93,15 @@ public class KeyRangeIterator extends RangeIterator<Long, Token>
             }};
         }
 
+        public LongSet getOffsets()
+        {
+            LongSet offsets = new LongOpenHashSet(4);
+            for (DecoratedKey key : keys)
+                offsets.add((long) key.getToken().getTokenValue());
+
+            return offsets;
+        }
+
         public void merge(CombinedValue<Long> other)
         {
             if (!(other instanceof Token))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c4d5c73/src/java/org/apache/cassandra/index/sasi/sa/SuffixSA.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/sa/SuffixSA.java b/src/java/org/apache/cassandra/index/sasi/sa/SuffixSA.java
index 63f6c5b..592299e 100644
--- a/src/java/org/apache/cassandra/index/sasi/sa/SuffixSA.java
+++ b/src/java/org/apache/cassandra/index/sasi/sa/SuffixSA.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.index.sasi.sa;
 import java.nio.ByteBuffer;
 import java.nio.CharBuffer;
 
+import org.apache.cassandra.index.sasi.disk.DynamicTokenTreeBuilder;
 import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder;
 import org.apache.cassandra.index.sasi.disk.TokenTreeBuilder;
 import org.apache.cassandra.db.marshal.AbstractType;
@@ -116,19 +117,19 @@ public class SuffixSA extends SA<CharBuffer>
                 if (lastProcessedSuffix == null)
                 {
                     lastProcessedSuffix = suffix.left;
-                    container = new TokenTreeBuilder(suffix.right.getTokens());
+                    container = new DynamicTokenTreeBuilder(suffix.right);
                 }
                 else if (comparator.compare(lastProcessedSuffix, suffix.left) == 0)
                 {
                     lastProcessedSuffix = suffix.left;
-                    container.add(suffix.right.getTokens());
+                    container.add(suffix.right);
                 }
                 else
                 {
                     Pair<ByteBuffer, TokenTreeBuilder> result = finishSuffix();
 
                     lastProcessedSuffix = suffix.left;
-                    container = new TokenTreeBuilder(suffix.right.getTokens());
+                    container = new DynamicTokenTreeBuilder(suffix.right);
 
                     return result;
                 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c4d5c73/src/java/org/apache/cassandra/index/sasi/utils/CombinedTerm.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/utils/CombinedTerm.java b/src/java/org/apache/cassandra/index/sasi/utils/CombinedTerm.java
index 2bf5a07..ba7123a 100644
--- a/src/java/org/apache/cassandra/index/sasi/utils/CombinedTerm.java
+++ b/src/java/org/apache/cassandra/index/sasi/utils/CombinedTerm.java
@@ -18,42 +18,22 @@
 package org.apache.cassandra.index.sasi.utils;
 
 import java.nio.ByteBuffer;
-import java.util.Map;
-import java.util.TreeMap;
+import java.util.*;
 
+import org.apache.cassandra.index.sasi.disk.*;
 import org.apache.cassandra.index.sasi.disk.OnDiskIndex.DataTerm;
-import org.apache.cassandra.index.sasi.disk.Token;
-import org.apache.cassandra.index.sasi.disk.TokenTree;
-import org.apache.cassandra.index.sasi.disk.TokenTreeBuilder;
 import org.apache.cassandra.db.marshal.AbstractType;
 
-import com.carrotsearch.hppc.LongOpenHashSet;
-import com.carrotsearch.hppc.LongSet;
-import com.carrotsearch.hppc.cursors.LongCursor;
-
 public class CombinedTerm implements CombinedValue<DataTerm>
 {
     private final AbstractType<?> comparator;
     private final DataTerm term;
-    private final TreeMap<Long, LongSet> tokens;
+    private final List<DataTerm> mergedTerms = new ArrayList<>();
 
     public CombinedTerm(AbstractType<?> comparator, DataTerm term)
     {
         this.comparator = comparator;
         this.term = term;
-        this.tokens = new TreeMap<>();
-
-        RangeIterator<Long, Token> tokens = term.getTokens();
-        while (tokens.hasNext())
-        {
-            Token current = tokens.next();
-            LongSet offsets = this.tokens.get(current.get());
-            if (offsets == null)
-                this.tokens.put(current.get(), (offsets = new LongOpenHashSet()));
-
-            for (Long offset : ((TokenTree.OnDiskToken) current).getOffsets())
-                offsets.add(offset);
-        }
     }
 
     public ByteBuffer getTerm()
@@ -61,14 +41,18 @@ public class CombinedTerm implements CombinedValue<DataTerm>
         return term.getTerm();
     }
 
-    public Map<Long, LongSet> getTokens()
+    public RangeIterator<Long, Token> getTokenIterator()
     {
-        return tokens;
+        RangeIterator.Builder<Long, Token> union = RangeUnionIterator.builder();
+        union.add(term.getTokens());
+        mergedTerms.stream().map(OnDiskIndex.DataTerm::getTokens).forEach(union::add);
+
+        return union.build();
     }
 
     public TokenTreeBuilder getTokenTreeBuilder()
     {
-        return new TokenTreeBuilder(tokens).finish();
+        return new StaticTokenTreeBuilder(this).finish();
     }
 
     public void merge(CombinedValue<DataTerm> other)
@@ -80,15 +64,7 @@ public class CombinedTerm implements CombinedValue<DataTerm>
 
         assert comparator == o.comparator;
 
-        for (Map.Entry<Long, LongSet> token : o.tokens.entrySet())
-        {
-            LongSet offsets = this.tokens.get(token.getKey());
-            if (offsets == null)
-                this.tokens.put(token.getKey(), (offsets = new LongOpenHashSet()));
-
-            for (LongCursor offset : token.getValue())
-                offsets.add(offset.value);
-        }
+        mergedTerms.add(o.term);
     }
 
     public DataTerm get()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c4d5c73/test/unit/org/apache/cassandra/index/sasi/disk/OnDiskIndexTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/sasi/disk/OnDiskIndexTest.java b/test/unit/org/apache/cassandra/index/sasi/disk/OnDiskIndexTest.java
index 6353155..628bd36 100644
--- a/test/unit/org/apache/cassandra/index/sasi/disk/OnDiskIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/sasi/disk/OnDiskIndexTest.java
@@ -605,6 +605,12 @@ public class OnDiskIndexTest
         }
     }
 
+    public void putAll(SortedMap<Long, LongSet> offsets, TokenTreeBuilder ttb)
+    {
+        for (Pair<Long, LongSet> entry : ttb)
+            offsets.put(entry.left, entry.right);
+    }
+
     @Test
     public void testCombiningOfThePartitionedSA() throws Exception
     {
@@ -620,7 +626,7 @@ public class OnDiskIndexTest
                 expected.put(i, (offsets = new TreeMap<>()));
 
             builderA.add(LongType.instance.decompose(i), keyAt(i), i);
-            offsets.putAll(keyBuilder(i).getTokens());
+            putAll(offsets, keyBuilder(i));
         }
 
         for (long i = 50; i < 100; i++)
@@ -631,7 +637,7 @@ public class OnDiskIndexTest
 
             long position = 100L + i;
             builderB.add(LongType.instance.decompose(i), keyAt(position), position);
-            offsets.putAll(keyBuilder(100L + i).getTokens());
+            putAll(offsets, keyBuilder(100L + i));
         }
 
         File indexA = File.createTempFile("on-disk-sa-partition-a", ".db");
@@ -659,7 +665,7 @@ public class OnDiskIndexTest
             if (offsets == null)
                 actual.put(composedTerm, (offsets = new TreeMap<>()));
 
-            offsets.putAll(term.getTokens());
+            putAll(offsets, term.getTokenTreeBuilder());
         }
 
         Assert.assertEquals(actual, expected);
@@ -684,7 +690,7 @@ public class OnDiskIndexTest
             if (offsets == null)
                 actual.put(composedTerm, (offsets = new TreeMap<>()));
 
-            offsets.putAll(term.getTokens());
+            putAll(offsets, term.getTokenTreeBuilder());
         }
 
         Assert.assertEquals(actual, expected);
@@ -735,7 +741,7 @@ public class OnDiskIndexTest
 
     private static TokenTreeBuilder keyBuilder(Long... keys)
     {
-        TokenTreeBuilder builder = new TokenTreeBuilder();
+        TokenTreeBuilder builder = new DynamicTokenTreeBuilder();
 
         for (final Long key : keys)
         {
@@ -850,9 +856,9 @@ public class OnDiskIndexTest
 
     private static void addAll(OnDiskIndexBuilder builder, ByteBuffer term, TokenTreeBuilder tokens)
     {
-        for (Map.Entry<Long, LongSet> token : tokens.getTokens().entrySet())
+        for (Pair<Long, LongSet> token : tokens)
         {
-            for (long position : token.getValue().toArray())
+            for (long position : token.right.toArray())
                 builder.add(term, keyAt(position), position);
         }
     }