You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by xe...@apache.org on 2016/01/24 04:36:12 UTC

[10/14] cassandra git commit: Integrate SASI index into Cassandra

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java b/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java
new file mode 100644
index 0000000..5d85d00
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java
@@ -0,0 +1,519 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.disk;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.index.sasi.utils.AbstractIterator;
+import org.apache.cassandra.index.sasi.utils.CombinedValue;
+import org.apache.cassandra.index.sasi.utils.MappedBuffer;
+import org.apache.cassandra.index.sasi.utils.RangeIterator;
+import org.apache.cassandra.utils.MergeIterator;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+import static org.apache.cassandra.index.sasi.disk.TokenTreeBuilder.EntryType;
+
+// Note: all of the seek-able offsets contained in TokenTree should be sizeof(long)
+// even if currently only lower int portion of them if used, because that makes
+// it possible to switch to mmap implementation which supports long positions
+// without any on-disk format changes and/or re-indexing if one day we'll have a need to.
+public class TokenTree
+{
+    private static final int LONG_BYTES = Long.SIZE / 8;
+    private static final int SHORT_BYTES = Short.SIZE / 8;
+
+    private final Descriptor descriptor;
+    private final MappedBuffer file;
+    private final long startPos;
+    private final long treeMinToken;
+    private final long treeMaxToken;
+    private final long tokenCount;
+
+    @VisibleForTesting
+    protected TokenTree(MappedBuffer tokenTree)
+    {
+        this(Descriptor.CURRENT, tokenTree);
+    }
+
+    public TokenTree(Descriptor d, MappedBuffer tokenTree)
+    {
+        descriptor = d;
+        file = tokenTree;
+        startPos = file.position();
+
+        file.position(startPos + TokenTreeBuilder.SHARED_HEADER_BYTES);
+
+        if (!validateMagic())
+            throw new IllegalArgumentException("invalid token tree");
+
+        tokenCount = file.getLong();
+        treeMinToken = file.getLong();
+        treeMaxToken = file.getLong();
+    }
+
+    public long getCount()
+    {
+        return tokenCount;
+    }
+
+    public RangeIterator<Long, Token> iterator(Function<Long, DecoratedKey> keyFetcher)
+    {
+        return new TokenTreeIterator(file.duplicate(), keyFetcher);
+    }
+
+    public OnDiskToken get(final long searchToken, Function<Long, DecoratedKey> keyFetcher)
+    {
+        seekToLeaf(searchToken, file);
+        long leafStart = file.position();
+        short leafSize = file.getShort(leafStart + 1); // skip the info byte
+
+        file.position(leafStart + TokenTreeBuilder.BLOCK_HEADER_BYTES); // skip to tokens
+        short tokenIndex = searchLeaf(searchToken, leafSize);
+
+        file.position(leafStart + TokenTreeBuilder.BLOCK_HEADER_BYTES);
+
+        OnDiskToken token = OnDiskToken.getTokenAt(file, tokenIndex, leafSize, keyFetcher);
+        return token.get().equals(searchToken) ? token : null;
+    }
+
+    private boolean validateMagic()
+    {
+        switch (descriptor.version.toString())
+        {
+            case Descriptor.VERSION_AA:
+                return true;
+            case Descriptor.VERSION_AB:
+                return TokenTreeBuilder.AB_MAGIC == file.getShort();
+            default:
+                return false;
+        }
+    }
+
+    // finds leaf that *could* contain token
+    private void seekToLeaf(long token, MappedBuffer file)
+    {
+        // this loop always seeks forward except for the first iteration
+        // where it may seek back to the root
+        long blockStart = startPos;
+        while (true)
+        {
+            file.position(blockStart);
+
+            byte info = file.get();
+            boolean isLeaf = (info & 1) == 1;
+
+            if (isLeaf)
+            {
+                file.position(blockStart);
+                break;
+            }
+
+            short tokenCount = file.getShort();
+
+            long minToken = file.getLong();
+            long maxToken = file.getLong();
+
+            long seekBase = blockStart + TokenTreeBuilder.BLOCK_HEADER_BYTES;
+            if (minToken > token)
+            {
+                // seek to beginning of child offsets to locate first child
+                file.position(seekBase + tokenCount * LONG_BYTES);
+                blockStart = (startPos + (int) file.getLong());
+            }
+            else if (maxToken < token)
+            {
+                // seek to end of child offsets to locate last child
+                file.position(seekBase + (2 * tokenCount) * LONG_BYTES);
+                blockStart = (startPos + (int) file.getLong());
+            }
+            else
+            {
+                // skip to end of block header/start of interior block tokens
+                file.position(seekBase);
+
+                short offsetIndex = searchBlock(token, tokenCount, file);
+
+                // file pointer is now at beginning of offsets
+                if (offsetIndex == tokenCount)
+                    file.position(file.position() + (offsetIndex * LONG_BYTES));
+                else
+                    file.position(file.position() + ((tokenCount - offsetIndex - 1) + offsetIndex) * LONG_BYTES);
+
+                blockStart = (startPos + (int) file.getLong());
+            }
+        }
+    }
+
+    private short searchBlock(long searchToken, short tokenCount, MappedBuffer file)
+    {
+        short offsetIndex = 0;
+        for (int i = 0; i < tokenCount; i++)
+        {
+            long readToken = file.getLong();
+            if (searchToken < readToken)
+                break;
+
+            offsetIndex++;
+        }
+
+        return offsetIndex;
+    }
+
+    private short searchLeaf(long searchToken, short tokenCount)
+    {
+        long base = file.position();
+
+        int start = 0;
+        int end = tokenCount;
+        int middle = 0;
+
+        while (start <= end)
+        {
+            middle = start + ((end - start) >> 1);
+
+            // each entry is 16 bytes wide, token is in bytes 4-11
+            long token = file.getLong(base + (middle * (2 * LONG_BYTES) + 4));
+
+            if (token == searchToken)
+                break;
+
+            if (token < searchToken)
+                start = middle + 1;
+            else
+                end = middle - 1;
+        }
+
+        return (short) middle;
+    }
+
+    public class TokenTreeIterator extends RangeIterator<Long, Token>
+    {
+        private final Function<Long, DecoratedKey> keyFetcher;
+        private final MappedBuffer file;
+
+        private long currentLeafStart;
+        private int currentTokenIndex;
+
+        private long leafMinToken;
+        private long leafMaxToken;
+        private short leafSize;
+
+        protected boolean firstIteration = true;
+        private boolean lastLeaf;
+
+        TokenTreeIterator(MappedBuffer file, Function<Long, DecoratedKey> keyFetcher)
+        {
+            super(treeMinToken, treeMaxToken, tokenCount);
+
+            this.file = file;
+            this.keyFetcher = keyFetcher;
+        }
+
+        protected Token computeNext()
+        {
+            maybeFirstIteration();
+
+            if (currentTokenIndex >= leafSize && lastLeaf)
+                return endOfData();
+
+            if (currentTokenIndex < leafSize) // tokens remaining in this leaf
+            {
+                return getTokenAt(currentTokenIndex++);
+            }
+            else // no more tokens remaining in this leaf
+            {
+                assert !lastLeaf;
+
+                seekToNextLeaf();
+                setupBlock();
+                return computeNext();
+            }
+        }
+
+        protected void performSkipTo(Long nextToken)
+        {
+            maybeFirstIteration();
+
+            if (nextToken <= leafMaxToken) // next is in this leaf block
+            {
+                searchLeaf(nextToken);
+            }
+            else // next is in a leaf block that needs to be found
+            {
+                seekToLeaf(nextToken, file);
+                setupBlock();
+                findNearest(nextToken);
+            }
+        }
+
+        private void setupBlock()
+        {
+            currentLeafStart = file.position();
+            currentTokenIndex = 0;
+
+            lastLeaf = (file.get() & (1 << TokenTreeBuilder.LAST_LEAF_SHIFT)) > 0;
+            leafSize = file.getShort();
+
+            leafMinToken = file.getLong();
+            leafMaxToken = file.getLong();
+
+            // seek to end of leaf header/start of data
+            file.position(currentLeafStart + TokenTreeBuilder.BLOCK_HEADER_BYTES);
+        }
+
+        private void findNearest(Long next)
+        {
+            if (next > leafMaxToken && !lastLeaf)
+            {
+                seekToNextLeaf();
+                setupBlock();
+                findNearest(next);
+            }
+            else if (next > leafMinToken)
+                searchLeaf(next);
+        }
+
+        private void searchLeaf(long next)
+        {
+            for (int i = currentTokenIndex; i < leafSize; i++)
+            {
+                if (compareTokenAt(currentTokenIndex, next) >= 0)
+                    break;
+
+                currentTokenIndex++;
+            }
+        }
+
+        private int compareTokenAt(int idx, long toToken)
+        {
+            return Long.compare(file.getLong(getTokenPosition(idx)), toToken);
+        }
+
+        private Token getTokenAt(int idx)
+        {
+            return OnDiskToken.getTokenAt(file, idx, leafSize, keyFetcher);
+        }
+
+        private long getTokenPosition(int idx)
+        {
+            // skip 4 byte entry header to get position pointing directly at the entry's token
+            return OnDiskToken.getEntryPosition(idx, file) + (2 * SHORT_BYTES);
+        }
+
+        private void seekToNextLeaf()
+        {
+            file.position(currentLeafStart + TokenTreeBuilder.BLOCK_BYTES);
+        }
+
+        public void close() throws IOException
+        {
+            // nothing to do here
+        }
+
+        private void maybeFirstIteration()
+        {
+            // seek to the first token only when requested for the first time,
+            // highly predictable branch and saves us a lot by not traversing the tree
+            // on creation time because it's not at all required.
+            if (!firstIteration)
+                return;
+
+            seekToLeaf(treeMinToken, file);
+            setupBlock();
+            firstIteration = false;
+        }
+    }
+
+    public static class OnDiskToken extends Token
+    {
+        private final Set<TokenInfo> info = new HashSet<>(2);
+        private final Set<DecoratedKey> loadedKeys = new TreeSet<>(DecoratedKey.comparator);
+
+        public OnDiskToken(MappedBuffer buffer, long position, short leafSize, Function<Long, DecoratedKey> keyFetcher)
+        {
+            super(buffer.getLong(position + (2 * SHORT_BYTES)));
+            info.add(new TokenInfo(buffer, position, leafSize, keyFetcher));
+        }
+
+        public void merge(CombinedValue<Long> other)
+        {
+            if (!(other instanceof Token))
+                return;
+
+            Token o = (Token) other;
+            if (token != o.token)
+                throw new IllegalArgumentException(String.format("%s != %s", token, o.token));
+
+            if (o instanceof OnDiskToken)
+            {
+                info.addAll(((OnDiskToken) other).info);
+            }
+            else
+            {
+                Iterators.addAll(loadedKeys, o.iterator());
+            }
+        }
+
+        public Iterator<DecoratedKey> iterator()
+        {
+            List<Iterator<DecoratedKey>> keys = new ArrayList<>(info.size());
+
+            for (TokenInfo i : info)
+                keys.add(i.iterator());
+
+            if (!loadedKeys.isEmpty())
+                keys.add(loadedKeys.iterator());
+
+            return MergeIterator.get(keys, DecoratedKey.comparator, new MergeIterator.Reducer<DecoratedKey, DecoratedKey>()
+            {
+                DecoratedKey reduced = null;
+
+                public boolean trivialReduceIsTrivial()
+                {
+                    return true;
+                }
+
+                public void reduce(int idx, DecoratedKey current)
+                {
+                    reduced = current;
+                }
+
+                protected DecoratedKey getReduced()
+                {
+                    return reduced;
+                }
+            });
+        }
+
+        public Set<Long> getOffsets()
+        {
+            Set<Long> offsets = new HashSet<>();
+            for (TokenInfo i : info)
+            {
+                for (long offset : i.fetchOffsets())
+                    offsets.add(offset);
+            }
+
+            return offsets;
+        }
+
+        public static OnDiskToken getTokenAt(MappedBuffer buffer, int idx, short leafSize, Function<Long, DecoratedKey> keyFetcher)
+        {
+            return new OnDiskToken(buffer, getEntryPosition(idx, buffer), leafSize, keyFetcher);
+        }
+
+        private static long getEntryPosition(int idx, MappedBuffer file)
+        {
+            // info (4 bytes) + token (8 bytes) + offset (4 bytes) = 16 bytes
+            return file.position() + (idx * (2 * LONG_BYTES));
+        }
+    }
+
+    private static class TokenInfo
+    {
+        private final MappedBuffer buffer;
+        private final Function<Long, DecoratedKey> keyFetcher;
+
+        private final long position;
+        private final short leafSize;
+
+        public TokenInfo(MappedBuffer buffer, long position, short leafSize, Function<Long, DecoratedKey> keyFetcher)
+        {
+            this.keyFetcher = keyFetcher;
+            this.buffer = buffer;
+            this.position = position;
+            this.leafSize = leafSize;
+        }
+
+        public Iterator<DecoratedKey> iterator()
+        {
+            return new KeyIterator(keyFetcher, fetchOffsets());
+        }
+
+        public int hashCode()
+        {
+            return new HashCodeBuilder().append(keyFetcher).append(position).append(leafSize).build();
+        }
+
+        public boolean equals(Object other)
+        {
+            if (!(other instanceof TokenInfo))
+                return false;
+
+            TokenInfo o = (TokenInfo) other;
+            return keyFetcher == o.keyFetcher && position == o.position;
+        }
+
+        private long[] fetchOffsets()
+        {
+            short info = buffer.getShort(position);
+            short offsetShort = buffer.getShort(position + SHORT_BYTES);
+            int offsetInt = buffer.getInt(position + (2 * SHORT_BYTES) + LONG_BYTES);
+
+            EntryType type = EntryType.of(info & TokenTreeBuilder.ENTRY_TYPE_MASK);
+
+            switch (type)
+            {
+                case SIMPLE:
+                    return new long[] { offsetInt };
+
+                case OVERFLOW:
+                    long[] offsets = new long[offsetShort]; // offsetShort contains count of tokens
+                    long offsetPos = (buffer.position() + (2 * (leafSize * LONG_BYTES)) + (offsetInt * LONG_BYTES));
+
+                    for (int i = 0; i < offsetShort; i++)
+                        offsets[i] = buffer.getLong(offsetPos + (i * LONG_BYTES));
+
+                    return offsets;
+
+                case FACTORED:
+                    return new long[] { (((long) offsetInt) << Short.SIZE) + offsetShort };
+
+                case PACKED:
+                    return new long[] { offsetShort, offsetInt };
+
+                default:
+                    throw new IllegalStateException("Unknown entry type: " + type);
+            }
+        }
+    }
+
+    private static class KeyIterator extends AbstractIterator<DecoratedKey>
+    {
+        private final Function<Long, DecoratedKey> keyFetcher;
+        private final long[] offsets;
+        private int index = 0;
+
+        public KeyIterator(Function<Long, DecoratedKey> keyFetcher, long[] offsets)
+        {
+            this.keyFetcher = keyFetcher;
+            this.offsets = offsets;
+        }
+
+        public DecoratedKey computeNext()
+        {
+            return index < offsets.length ? keyFetcher.apply(offsets[index++]) : endOfData();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/disk/TokenTreeBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/TokenTreeBuilder.java b/src/java/org/apache/cassandra/index/sasi/disk/TokenTreeBuilder.java
new file mode 100644
index 0000000..e10b057
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/disk/TokenTreeBuilder.java
@@ -0,0 +1,839 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.disk;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+
+import com.carrotsearch.hppc.LongArrayList;
+import com.carrotsearch.hppc.LongSet;
+import com.carrotsearch.hppc.cursors.LongCursor;
+import com.carrotsearch.hppc.LongOpenHashSet;
+import com.google.common.collect.AbstractIterator;
+
+public class TokenTreeBuilder
+{
+    // note: ordinal positions are used here, do not change order
+    enum EntryType
+    {
+        SIMPLE, FACTORED, PACKED, OVERFLOW;
+
+        public static EntryType of(int ordinal)
+        {
+            if (ordinal == SIMPLE.ordinal())
+                return SIMPLE;
+
+            if (ordinal == FACTORED.ordinal())
+                return FACTORED;
+
+            if (ordinal == PACKED.ordinal())
+                return PACKED;
+
+            if (ordinal == OVERFLOW.ordinal())
+                return OVERFLOW;
+
+            throw new IllegalArgumentException("Unknown ordinal: " + ordinal);
+        }
+    }
+
+    public static final int BLOCK_BYTES = 4096;
+    public static final int BLOCK_HEADER_BYTES = 64;
+    public static final int OVERFLOW_TRAILER_BYTES = 64;
+    public static final int OVERFLOW_TRAILER_CAPACITY = OVERFLOW_TRAILER_BYTES / 8;
+    public static final int TOKENS_PER_BLOCK = (BLOCK_BYTES - BLOCK_HEADER_BYTES - OVERFLOW_TRAILER_BYTES) / 16;
+    public static final long MAX_OFFSET = (1L << 47) - 1; // 48 bits for (signed) offset
+    public static final byte LAST_LEAF_SHIFT = 1;
+    public static final byte SHARED_HEADER_BYTES = 19;
+    public static final byte ENTRY_TYPE_MASK = 0x03;
+    public static final short AB_MAGIC = 0x5A51;
+
+    private final SortedMap<Long, LongSet> tokens = new TreeMap<>();
+    private int numBlocks;
+
+    private Node root;
+    private InteriorNode rightmostParent;
+    private Leaf leftmostLeaf;
+    private Leaf rightmostLeaf;
+    private long tokenCount = 0;
+    private long treeMinToken;
+    private long treeMaxToken;
+
+    public TokenTreeBuilder()
+    {}
+
+    public TokenTreeBuilder(SortedMap<Long, LongSet> data)
+    {
+        add(data);
+    }
+
+    public void add(Long token, long keyPosition)
+    {
+        LongSet found = tokens.get(token);
+        if (found == null)
+            tokens.put(token, (found = new LongOpenHashSet(2)));
+
+        found.add(keyPosition);
+    }
+
+    public void add(SortedMap<Long, LongSet> data)
+    {
+        for (Map.Entry<Long, LongSet> newEntry : data.entrySet())
+        {
+            LongSet found = tokens.get(newEntry.getKey());
+            if (found == null)
+                tokens.put(newEntry.getKey(), (found = new LongOpenHashSet(4)));
+
+            for (LongCursor offset : newEntry.getValue())
+                found.add(offset.value);
+        }
+    }
+
+    public TokenTreeBuilder finish()
+    {
+        maybeBulkLoad();
+        return this;
+    }
+
+    public SortedMap<Long, LongSet> getTokens()
+    {
+        return tokens;
+    }
+
+    public long getTokenCount()
+    {
+        return tokenCount;
+    }
+
+    public int serializedSize()
+    {
+        if (numBlocks == 1)
+            return (BLOCK_HEADER_BYTES + ((int) tokenCount * 16));
+        else
+            return numBlocks * BLOCK_BYTES;
+    }
+
+    public void write(DataOutputPlus out) throws IOException
+    {
+        ByteBuffer blockBuffer = ByteBuffer.allocate(BLOCK_BYTES);
+        Iterator<Node> levelIterator = root.levelIterator();
+        long childBlockIndex = 1;
+
+        while (levelIterator != null)
+        {
+
+            Node firstChild = null;
+            while (levelIterator.hasNext())
+            {
+                Node block = levelIterator.next();
+
+                if (firstChild == null && !block.isLeaf())
+                    firstChild = ((InteriorNode) block).children.get(0);
+
+                block.serialize(childBlockIndex, blockBuffer);
+                flushBuffer(blockBuffer, out, numBlocks != 1);
+
+                childBlockIndex += block.childCount();
+            }
+
+            levelIterator = (firstChild == null) ? null : firstChild.levelIterator();
+        }
+    }
+
+    public Iterator<Pair<Long, LongSet>> iterator()
+    {
+        return new TokenIterator(leftmostLeaf.levelIterator());
+    }
+
+    private void maybeBulkLoad()
+    {
+        if (root == null)
+            bulkLoad();
+    }
+
+    private void flushBuffer(ByteBuffer buffer, DataOutputPlus o, boolean align) throws IOException
+    {
+        // seek to end of last block before flushing
+        if (align)
+            alignBuffer(buffer, BLOCK_BYTES);
+
+        buffer.flip();
+        o.write(buffer);
+        buffer.clear();
+    }
+
+    private static void alignBuffer(ByteBuffer buffer, int blockSize)
+    {
+        long curPos = buffer.position();
+        if ((curPos & (blockSize - 1)) != 0) // align on the block boundary if needed
+            buffer.position((int) FBUtilities.align(curPos, blockSize));
+    }
+
+    private void bulkLoad()
+    {
+        tokenCount = tokens.size();
+        treeMinToken = tokens.firstKey();
+        treeMaxToken = tokens.lastKey();
+        numBlocks = 1;
+
+        // special case the tree that only has a single block in it (so we don't create a useless root)
+        if (tokenCount <= TOKENS_PER_BLOCK)
+        {
+            leftmostLeaf = new Leaf(tokens);
+            rightmostLeaf = leftmostLeaf;
+            root = leftmostLeaf;
+        }
+        else
+        {
+            root = new InteriorNode();
+            rightmostParent = (InteriorNode) root;
+
+            int i = 0;
+            Leaf lastLeaf = null;
+            Long firstToken = tokens.firstKey();
+            Long finalToken = tokens.lastKey();
+            Long lastToken;
+            for (Long token : tokens.keySet())
+            {
+                if (i == 0 || (i % TOKENS_PER_BLOCK != 0 && i != (tokenCount - 1)))
+                {
+                    i++;
+                    continue;
+                }
+
+                lastToken = token;
+                Leaf leaf = (i != (tokenCount - 1) || token.equals(finalToken)) ?
+                        new Leaf(tokens.subMap(firstToken, lastToken)) : new Leaf(tokens.tailMap(firstToken));
+
+                if (i == TOKENS_PER_BLOCK)
+                    leftmostLeaf = leaf;
+                else
+                    lastLeaf.next = leaf;
+
+                rightmostParent.add(leaf);
+                lastLeaf = leaf;
+                rightmostLeaf = leaf;
+                firstToken = lastToken;
+                i++;
+                numBlocks++;
+
+                if (token.equals(finalToken))
+                {
+                    Leaf finalLeaf = new Leaf(tokens.tailMap(token));
+                    lastLeaf.next = finalLeaf;
+                    rightmostParent.add(finalLeaf);
+                    rightmostLeaf = finalLeaf;
+                    numBlocks++;
+                }
+            }
+
+        }
+    }
+
+    private abstract class Node
+    {
+        protected InteriorNode parent;
+        protected Node next;
+        protected Long nodeMinToken, nodeMaxToken;
+
+        public abstract void serialize(long childBlockIndex, ByteBuffer buf);
+        public abstract int childCount();
+        public abstract int tokenCount();
+        public abstract Long smallestToken();
+
+        public Iterator<Node> levelIterator()
+        {
+            return new LevelIterator(this);
+        }
+
+        public boolean isLeaf()
+        {
+            return (this instanceof Leaf);
+        }
+
+        protected boolean isLastLeaf()
+        {
+            return this == rightmostLeaf;
+        }
+
+        protected boolean isRoot()
+        {
+            return this == root;
+        }
+
+        protected void updateTokenRange(long token)
+        {
+            nodeMinToken = nodeMinToken == null ? token : Math.min(nodeMinToken, token);
+            nodeMaxToken = nodeMaxToken == null ? token : Math.max(nodeMaxToken, token);
+        }
+
+        protected void serializeHeader(ByteBuffer buf)
+        {
+            Header header;
+            if (isRoot())
+                header = new RootHeader();
+            else if (!isLeaf())
+                header = new InteriorNodeHeader();
+            else
+                header = new LeafHeader();
+
+            header.serialize(buf);
+            alignBuffer(buf, BLOCK_HEADER_BYTES);
+        }
+
+        private abstract class Header
+        {
+            public void serialize(ByteBuffer buf)
+            {
+                buf.put(infoByte())
+                        .putShort((short) (tokenCount()))
+                        .putLong(nodeMinToken)
+                        .putLong(nodeMaxToken);
+            }
+
+            protected abstract byte infoByte();
+        }
+
+        private class RootHeader extends Header
+        {
+            public void serialize(ByteBuffer buf)
+            {
+                super.serialize(buf);
+                writeMagic(buf);
+                buf.putLong(tokenCount)
+                        .putLong(treeMinToken)
+                        .putLong(treeMaxToken);
+            }
+
+            protected byte infoByte()
+            {
+                // if leaf, set leaf indicator and last leaf indicator (bits 0 & 1)
+                // if not leaf, clear both bits
+                return (byte) ((isLeaf()) ? 3 : 0);
+            }
+
+            protected void writeMagic(ByteBuffer buf)
+            {
+                switch (Descriptor.CURRENT_VERSION)
+                {
+                    case Descriptor.VERSION_AB:
+                        buf.putShort(AB_MAGIC);
+                        break;
+                    default:
+                        break;
+                }
+
+            }
+        }
+
+        private class InteriorNodeHeader extends Header
+        {
+            // bit 0 (leaf indicator) & bit 1 (last leaf indicator) cleared
+            protected byte infoByte()
+            {
+                return 0;
+            }
+        }
+
+        private class LeafHeader extends Header
+        {
+            // bit 0 set as leaf indicator
+            // bit 1 set if this is last leaf of data
+            protected byte infoByte()
+            {
+                byte infoByte = 1;
+                infoByte |= (isLastLeaf()) ? (1 << LAST_LEAF_SHIFT) : 0;
+
+                return infoByte;
+            }
+        }
+
+    }
+
+    private class Leaf extends Node
+    {
+        private final SortedMap<Long, LongSet> tokens;
+        private LongArrayList overflowCollisions;
+
+        Leaf(SortedMap<Long, LongSet> data)
+        {
+            nodeMinToken = data.firstKey();
+            nodeMaxToken = data.lastKey();
+            tokens = data;
+        }
+
+        public Long largestToken()
+        {
+            return nodeMaxToken;
+        }
+
+        public void serialize(long childBlockIndex, ByteBuffer buf)
+        {
+            serializeHeader(buf);
+            serializeData(buf);
+            serializeOverflowCollisions(buf);
+        }
+
+        public int childCount()
+        {
+            return 0;
+        }
+
+        public int tokenCount()
+        {
+            return tokens.size();
+        }
+
+        public Long smallestToken()
+        {
+            return nodeMinToken;
+        }
+
+        public Iterator<Map.Entry<Long, LongSet>> tokenIterator()
+        {
+            return tokens.entrySet().iterator();
+        }
+
+        private void serializeData(ByteBuffer buf)
+        {
+            for (Map.Entry<Long, LongSet> entry : tokens.entrySet())
+                createEntry(entry.getKey(), entry.getValue()).serialize(buf);
+        }
+
+        private void serializeOverflowCollisions(ByteBuffer buf)
+        {
+            if (overflowCollisions != null)
+                for (LongCursor offset : overflowCollisions)
+                    buf.putLong(offset.value);
+        }
+
+
+        private LeafEntry createEntry(final long tok, final LongSet offsets)
+        {
+            int offsetCount = offsets.size();
+            switch (offsetCount)
+            {
+                case 0:
+                    throw new AssertionError("no offsets for token " + tok);
+                case 1:
+                    long offset = offsets.toArray()[0];
+                    if (offset > MAX_OFFSET)
+                        throw new AssertionError("offset " + offset + " cannot be greater than " + MAX_OFFSET);
+                    else if (offset <= Integer.MAX_VALUE)
+                        return new SimpleLeafEntry(tok, offset);
+                    else
+                        return new FactoredOffsetLeafEntry(tok, offset);
+                case 2:
+                    long[] rawOffsets = offsets.toArray();
+                    if (rawOffsets[0] <= Integer.MAX_VALUE && rawOffsets[1] <= Integer.MAX_VALUE &&
+                            (rawOffsets[0] <= Short.MAX_VALUE || rawOffsets[1] <= Short.MAX_VALUE))
+                        return new PackedCollisionLeafEntry(tok, rawOffsets);
+                    else
+                        return createOverflowEntry(tok, offsetCount, offsets);
+                default:
+                    return createOverflowEntry(tok, offsetCount, offsets);
+            }
+        }
+
+        private LeafEntry createOverflowEntry(final long tok, final int offsetCount, final LongSet offsets)
+        {
+            if (overflowCollisions == null)
+                overflowCollisions = new LongArrayList();
+
+            LeafEntry entry = new OverflowCollisionLeafEntry(tok, (short) overflowCollisions.size(), (short) offsetCount);
+            for (LongCursor o : offsets) {
+                if (overflowCollisions.size() == OVERFLOW_TRAILER_CAPACITY)
+                    throw new AssertionError("cannot have more than " + OVERFLOW_TRAILER_CAPACITY + " overflow collisions per leaf");
+                else
+                    overflowCollisions.add(o.value);
+            }
+            return entry;
+        }
+
+        private abstract class LeafEntry
+        {
+            protected final long token;
+
+            abstract public EntryType type();
+            abstract public int offsetData();
+            abstract public short offsetExtra();
+
+            public LeafEntry(final long tok)
+            {
+                token = tok;
+            }
+
+            public void serialize(ByteBuffer buf)
+            {
+                buf.putShort((short) type().ordinal())
+                        .putShort(offsetExtra())
+                        .putLong(token)
+                        .putInt(offsetData());
+            }
+
+        }
+
+
+        // assumes there is a single offset and the offset is <= Integer.MAX_VALUE
+        private class SimpleLeafEntry extends LeafEntry
+        {
+            private final long offset;
+
+            public SimpleLeafEntry(final long tok, final long off)
+            {
+                super(tok);
+                offset = off;
+            }
+
+            public EntryType type()
+            {
+                return EntryType.SIMPLE;
+            }
+
+            public int offsetData()
+            {
+                return (int) offset;
+            }
+
+            public short offsetExtra()
+            {
+                return 0;
+            }
+        }
+
+        // assumes there is a single offset and Integer.MAX_VALUE < offset <= MAX_OFFSET
+        // take the middle 32 bits of offset (or the top 32 when considering offset is max 48 bits)
+        // and store where offset is normally stored. take bottom 16 bits of offset and store in entry header
+        private class FactoredOffsetLeafEntry extends LeafEntry
+        {
+            private final long offset;
+
+            public FactoredOffsetLeafEntry(final long tok, final long off)
+            {
+                super(tok);
+                offset = off;
+            }
+
+            public EntryType type()
+            {
+                return EntryType.FACTORED;
+            }
+
+            public int offsetData()
+            {
+                return (int) (offset >>> Short.SIZE);
+            }
+
+            public short offsetExtra()
+            {
+                return (short) offset;
+            }
+        }
+
+        // holds an entry with two offsets that can be packed in an int & a short
+        // the int offset is stored where offset is normally stored. short offset is
+        // stored in entry header
+        private class PackedCollisionLeafEntry extends LeafEntry
+        {
+            private short smallerOffset;
+            private int largerOffset;
+
+            public PackedCollisionLeafEntry(final long tok, final long[] offs)
+            {
+                super(tok);
+
+                smallerOffset = (short) Math.min(offs[0], offs[1]);
+                largerOffset = (int) Math.max(offs[0], offs[1]);
+            }
+
+            public EntryType type()
+            {
+                return EntryType.PACKED;
+            }
+
+            public int offsetData()
+            {
+                return largerOffset;
+            }
+
+            public short offsetExtra()
+            {
+                return smallerOffset;
+            }
+        }
+
+        // holds an entry with three or more offsets, or two offsets that cannot
+        // be packed into an int & a short. the index into the overflow list
+        // is stored where the offset is normally stored. the number of overflowed offsets
+        // for the entry is stored in the entry header
+        private class OverflowCollisionLeafEntry extends LeafEntry
+        {
+            private final short startIndex;
+            private final short count;
+
+            public OverflowCollisionLeafEntry(final long tok, final short collisionStartIndex, final short collisionCount)
+            {
+                super(tok);
+                startIndex = collisionStartIndex;
+                count = collisionCount;
+            }
+
+            public EntryType type()
+            {
+                return EntryType.OVERFLOW;
+            }
+
+            public int offsetData()
+            {
+                return startIndex;
+            }
+
+            public short offsetExtra()
+            {
+                return count;
+            }
+
+        }
+
+    }
+
+    private class InteriorNode extends Node
+    {
+        private List<Long> tokens = new ArrayList<>(TOKENS_PER_BLOCK);
+        private List<Node> children = new ArrayList<>(TOKENS_PER_BLOCK + 1);
+        private int position = 0; // TODO (jwest): can get rid of this and use array size
+
+
+        public void serialize(long childBlockIndex, ByteBuffer buf)
+        {
+            serializeHeader(buf);
+            serializeTokens(buf);
+            serializeChildOffsets(childBlockIndex, buf);
+        }
+
+        public int childCount()
+        {
+            return children.size();
+        }
+
+        public int tokenCount()
+        {
+            return tokens.size();
+        }
+
+        public Long smallestToken()
+        {
+            return tokens.get(0);
+        }
+
+        protected void add(Long token, InteriorNode leftChild, InteriorNode rightChild)
+        {
+            int pos = tokens.size();
+            if (pos == TOKENS_PER_BLOCK)
+            {
+                InteriorNode sibling = split();
+                sibling.add(token, leftChild, rightChild);
+
+            }
+            else {
+                if (leftChild != null)
+                    children.add(pos, leftChild);
+
+                if (rightChild != null)
+                {
+                    children.add(pos + 1, rightChild);
+                    rightChild.parent = this;
+                }
+
+                updateTokenRange(token);
+                tokens.add(pos, token);
+            }
+        }
+
+        protected void add(Leaf node)
+        {
+
+            if (position == (TOKENS_PER_BLOCK + 1))
+            {
+                rightmostParent = split();
+                rightmostParent.add(node);
+            }
+            else
+            {
+
+                node.parent = this;
+                children.add(position, node);
+                position++;
+
+                // the first child is referenced only during bulk load. we don't take a value
+                // to store into the tree, one is subtracted since position has already been incremented
+                // for the next node to be added
+                if (position - 1 == 0)
+                    return;
+
+
+                // tokens are inserted one behind the current position, but 2 is subtracted because
+                // position has already been incremented for the next add
+                Long smallestToken = node.smallestToken();
+                updateTokenRange(smallestToken);
+                tokens.add(position - 2, smallestToken);
+            }
+
+        }
+
+        protected InteriorNode split()
+        {
+            Pair<Long, InteriorNode> splitResult = splitBlock();
+            Long middleValue = splitResult.left;
+            InteriorNode sibling = splitResult.right;
+            InteriorNode leftChild = null;
+
+            // create a new root if necessary
+            if (parent == null)
+            {
+                parent = new InteriorNode();
+                root = parent;
+                sibling.parent = parent;
+                leftChild = this;
+                numBlocks++;
+            }
+
+            parent.add(middleValue, leftChild, sibling);
+
+            return sibling;
+        }
+
+        protected Pair<Long, InteriorNode> splitBlock()
+        {
+            final int splitPosition = TOKENS_PER_BLOCK - 2;
+            InteriorNode sibling = new InteriorNode();
+            sibling.parent = parent;
+            next = sibling;
+
+            Long middleValue = tokens.get(splitPosition);
+
+            for (int i = splitPosition; i < TOKENS_PER_BLOCK; i++)
+            {
+                if (i != TOKENS_PER_BLOCK && i != splitPosition)
+                {
+                    long token = tokens.get(i);
+                    sibling.updateTokenRange(token);
+                    sibling.tokens.add(token);
+                }
+
+                Node child = children.get(i + 1);
+                child.parent = sibling;
+                sibling.children.add(child);
+                sibling.position++;
+            }
+
+            for (int i = TOKENS_PER_BLOCK; i >= splitPosition; i--)
+            {
+                if (i != TOKENS_PER_BLOCK)
+                    tokens.remove(i);
+
+                if (i != splitPosition)
+                    children.remove(i);
+            }
+
+            nodeMinToken = smallestToken();
+            nodeMaxToken = tokens.get(tokens.size() - 1);
+            numBlocks++;
+
+            return Pair.create(middleValue, sibling);
+        }
+
+        protected boolean isFull()
+        {
+            return (position >= TOKENS_PER_BLOCK + 1);
+        }
+
+        private void serializeTokens(ByteBuffer buf)
+        {
+            for (Long token : tokens)
+                buf.putLong(token);
+        }
+
+
+        private void serializeChildOffsets(long childBlockIndex, ByteBuffer buf)
+        {
+            for (int i = 0; i < children.size(); i++)
+                buf.putLong((childBlockIndex + i) * BLOCK_BYTES);
+        }
+    }
+
+    public static class LevelIterator extends AbstractIterator<Node>
+    {
+        private Node currentNode;
+
+        LevelIterator(Node first)
+        {
+            currentNode = first;
+        }
+
+        public Node computeNext()
+        {
+            if (currentNode == null)
+                return endOfData();
+
+            Node returnNode = currentNode;
+            currentNode = returnNode.next;
+
+            return returnNode;
+        }
+
+
+    }
+
+    public static class TokenIterator extends AbstractIterator<Pair<Long, LongSet>>
+    {
+        private Iterator<Node> levelIterator;
+        private Iterator<Map.Entry<Long, LongSet>> currentIterator;
+
+        TokenIterator(Iterator<Node> level)
+        {
+            levelIterator = level;
+            if (levelIterator.hasNext())
+                currentIterator = ((Leaf) levelIterator.next()).tokenIterator();
+        }
+
+        public Pair<Long, LongSet> computeNext()
+        {
+            if (currentIterator != null && currentIterator.hasNext())
+            {
+                Map.Entry<Long, LongSet> next = currentIterator.next();
+                return Pair.create(next.getKey(), next.getValue());
+            }
+            else
+            {
+                if (!levelIterator.hasNext())
+                    return endOfData();
+                else
+                {
+                    currentIterator = ((Leaf) levelIterator.next()).tokenIterator();
+                    return computeNext();
+                }
+            }
+
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/exceptions/TimeQuotaExceededException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/exceptions/TimeQuotaExceededException.java b/src/java/org/apache/cassandra/index/sasi/exceptions/TimeQuotaExceededException.java
new file mode 100644
index 0000000..af577dc
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/exceptions/TimeQuotaExceededException.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.exceptions;
+
+public class TimeQuotaExceededException extends RuntimeException
+{}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/memory/IndexMemtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/memory/IndexMemtable.java b/src/java/org/apache/cassandra/index/sasi/memory/IndexMemtable.java
new file mode 100644
index 0000000..cf7f3a5
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/memory/IndexMemtable.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.memory;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.index.sasi.conf.ColumnIndex;
+import org.apache.cassandra.index.sasi.disk.Token;
+import org.apache.cassandra.index.sasi.plan.Expression;
+import org.apache.cassandra.index.sasi.utils.RangeIterator;
+import org.apache.cassandra.index.sasi.utils.TypeUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class IndexMemtable
+{
+    private static final Logger logger = LoggerFactory.getLogger(IndexMemtable.class);
+
+    private final MemIndex index;
+
+    public IndexMemtable(ColumnIndex columnIndex)
+    {
+        this.index = MemIndex.forColumn(columnIndex.keyValidator(), columnIndex);
+    }
+
+    public long index(DecoratedKey key, ByteBuffer value)
+    {
+        if (value == null || value.remaining() == 0)
+            return 0;
+
+        AbstractType<?> validator = index.columnIndex.getValidator();
+        if (!TypeUtil.isValid(value, validator))
+        {
+            int size = value.remaining();
+            if ((value = TypeUtil.tryUpcast(value, validator)) == null)
+            {
+                logger.error("Can't add column {} to index for key: {}, value size {} bytes, validator: {}.",
+                             index.columnIndex.getColumnName(),
+                             index.columnIndex.keyValidator().getString(key.getKey()),
+                             size,
+                             validator);
+                return 0;
+            }
+        }
+
+        return index.add(key, value);
+    }
+
+    public RangeIterator<Long, Token> search(Expression expression)
+    {
+        return index == null ? null : index.search(expression);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/memory/KeyRangeIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/memory/KeyRangeIterator.java b/src/java/org/apache/cassandra/index/sasi/memory/KeyRangeIterator.java
new file mode 100644
index 0000000..293e2ee
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/memory/KeyRangeIterator.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.memory;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.index.sasi.disk.Token;
+import org.apache.cassandra.index.sasi.utils.AbstractIterator;
+import org.apache.cassandra.index.sasi.utils.CombinedValue;
+import org.apache.cassandra.index.sasi.utils.RangeIterator;
+
+import com.google.common.collect.PeekingIterator;
+
+public class KeyRangeIterator extends RangeIterator<Long, Token>
+{
+    private final DKIterator iterator;
+
+    public KeyRangeIterator(ConcurrentSkipListSet<DecoratedKey> keys)
+    {
+        super((Long) keys.first().getToken().getTokenValue(), (Long) keys.last().getToken().getTokenValue(), keys.size());
+        this.iterator = new DKIterator(keys.iterator());
+    }
+
+    protected Token computeNext()
+    {
+        return iterator.hasNext() ? new DKToken(iterator.next()) : endOfData();
+    }
+
+    protected void performSkipTo(Long nextToken)
+    {
+        while (iterator.hasNext())
+        {
+            DecoratedKey key = iterator.peek();
+            if (Long.compare((long) key.getToken().getTokenValue(), nextToken) >= 0)
+                break;
+
+            // consume smaller key
+            iterator.next();
+        }
+    }
+
+    public void close() throws IOException
+    {}
+
+    private static class DKIterator extends AbstractIterator<DecoratedKey> implements PeekingIterator<DecoratedKey>
+    {
+        private final Iterator<DecoratedKey> keys;
+
+        public DKIterator(Iterator<DecoratedKey> keys)
+        {
+            this.keys = keys;
+        }
+
+        protected DecoratedKey computeNext()
+        {
+            return keys.hasNext() ? keys.next() : endOfData();
+        }
+    }
+
+    private static class DKToken extends Token
+    {
+        private final SortedSet<DecoratedKey> keys;
+
+        public DKToken(final DecoratedKey key)
+        {
+            super((long) key.getToken().getTokenValue());
+
+            keys = new TreeSet<DecoratedKey>(DecoratedKey.comparator)
+            {{
+                add(key);
+            }};
+        }
+
+        public void merge(CombinedValue<Long> other)
+        {
+            if (!(other instanceof Token))
+                return;
+
+            Token o = (Token) other;
+            assert o.get().equals(token);
+
+            if (o instanceof DKToken)
+            {
+                keys.addAll(((DKToken) o).keys);
+            }
+            else
+            {
+                for (DecoratedKey key : o)
+                    keys.add(key);
+            }
+        }
+
+        public Iterator<DecoratedKey> iterator()
+        {
+            return keys.iterator();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/memory/MemIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/memory/MemIndex.java b/src/java/org/apache/cassandra/index/sasi/memory/MemIndex.java
new file mode 100644
index 0000000..22d6c9e
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/memory/MemIndex.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.memory;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.index.sasi.conf.ColumnIndex;
+import org.apache.cassandra.index.sasi.disk.Token;
+import org.apache.cassandra.index.sasi.plan.Expression;
+import org.apache.cassandra.index.sasi.utils.RangeIterator;
+import org.apache.cassandra.db.marshal.AbstractType;
+
+import org.github.jamm.MemoryMeter;
+
+public abstract class MemIndex
+{
+    protected final AbstractType<?> keyValidator;
+    protected final ColumnIndex columnIndex;
+
+    protected MemIndex(AbstractType<?> keyValidator, ColumnIndex columnIndex)
+    {
+        this.keyValidator = keyValidator;
+        this.columnIndex = columnIndex;
+    }
+
+    public abstract long add(DecoratedKey key, ByteBuffer value);
+    public abstract RangeIterator<Long, Token> search(Expression expression);
+
+    public static MemIndex forColumn(AbstractType<?> keyValidator, ColumnIndex columnIndex)
+    {
+        return columnIndex.isLiteral()
+                ? new TrieMemIndex(keyValidator, columnIndex)
+                : new SkipListMemIndex(keyValidator, columnIndex);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/memory/SkipListMemIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/memory/SkipListMemIndex.java b/src/java/org/apache/cassandra/index/sasi/memory/SkipListMemIndex.java
new file mode 100644
index 0000000..69b57d0
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/memory/SkipListMemIndex.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.memory;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.index.sasi.conf.ColumnIndex;
+import org.apache.cassandra.index.sasi.disk.Token;
+import org.apache.cassandra.index.sasi.plan.Expression;
+import org.apache.cassandra.index.sasi.utils.RangeUnionIterator;
+import org.apache.cassandra.index.sasi.utils.RangeIterator;
+import org.apache.cassandra.db.marshal.AbstractType;
+
+public class SkipListMemIndex extends MemIndex
+{
+    public static final int CSLM_OVERHEAD = 128; // average overhead of CSLM
+
+    private final ConcurrentSkipListMap<ByteBuffer, ConcurrentSkipListSet<DecoratedKey>> index;
+
+    public SkipListMemIndex(AbstractType<?> keyValidator, ColumnIndex columnIndex)
+    {
+        super(keyValidator, columnIndex);
+        index = new ConcurrentSkipListMap<>(columnIndex.getValidator());
+    }
+
+    public long add(DecoratedKey key, ByteBuffer value)
+    {
+        long overhead = CSLM_OVERHEAD; // DKs are shared
+        ConcurrentSkipListSet<DecoratedKey> keys = index.get(value);
+
+        if (keys == null)
+        {
+            ConcurrentSkipListSet<DecoratedKey> newKeys = new ConcurrentSkipListSet<>(DecoratedKey.comparator);
+            keys = index.putIfAbsent(value, newKeys);
+            if (keys == null)
+            {
+                overhead += CSLM_OVERHEAD + value.remaining();
+                keys = newKeys;
+            }
+        }
+
+        keys.add(key);
+
+        return overhead;
+    }
+
+    public RangeIterator<Long, Token> search(Expression expression)
+    {
+        ByteBuffer min = expression.lower == null ? null : expression.lower.value;
+        ByteBuffer max = expression.upper == null ? null : expression.upper.value;
+
+        SortedMap<ByteBuffer, ConcurrentSkipListSet<DecoratedKey>> search;
+
+        if (min == null && max == null)
+        {
+            throw new IllegalArgumentException();
+        }
+        if (min != null && max != null)
+        {
+            search = index.subMap(min, expression.lower.inclusive, max, expression.upper.inclusive);
+        }
+        else if (min == null)
+        {
+            search = index.headMap(max, expression.upper.inclusive);
+        }
+        else
+        {
+            search = index.tailMap(min, expression.lower.inclusive);
+        }
+
+        RangeUnionIterator.Builder<Long, Token> builder = RangeUnionIterator.builder();
+        search.values().stream()
+                       .filter(keys -> !keys.isEmpty())
+                       .forEach(keys -> builder.add(new KeyRangeIterator(keys)));
+
+        return builder.build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/memory/TrieMemIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/memory/TrieMemIndex.java b/src/java/org/apache/cassandra/index/sasi/memory/TrieMemIndex.java
new file mode 100644
index 0000000..e4ee6eb
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/memory/TrieMemIndex.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.memory;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.index.sasi.conf.ColumnIndex;
+import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder;
+import org.apache.cassandra.index.sasi.disk.Token;
+import org.apache.cassandra.index.sasi.plan.Expression;
+import org.apache.cassandra.index.sasi.analyzer.AbstractAnalyzer;
+import org.apache.cassandra.index.sasi.utils.RangeUnionIterator;
+import org.apache.cassandra.index.sasi.utils.RangeIterator;
+import org.apache.cassandra.db.marshal.AbstractType;
+
+import com.googlecode.concurrenttrees.radix.ConcurrentRadixTree;
+import com.googlecode.concurrenttrees.suffix.ConcurrentSuffixTree;
+import com.googlecode.concurrenttrees.radix.node.concrete.SmartArrayBasedNodeFactory;
+import com.googlecode.concurrenttrees.radix.node.Node;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.cassandra.index.sasi.memory.SkipListMemIndex.CSLM_OVERHEAD;
+
+public class TrieMemIndex extends MemIndex
+{
+    private static final Logger logger = LoggerFactory.getLogger(TrieMemIndex.class);
+
+    private final ConcurrentTrie index;
+
+    public TrieMemIndex(AbstractType<?> keyValidator, ColumnIndex columnIndex)
+    {
+        super(keyValidator, columnIndex);
+
+        switch (columnIndex.getMode().mode)
+        {
+            case CONTAINS:
+                index = new ConcurrentSuffixTrie(columnIndex.getDefinition());
+                break;
+
+            case PREFIX:
+                index = new ConcurrentPrefixTrie(columnIndex.getDefinition());
+                break;
+
+            default:
+                throw new IllegalStateException("Unsupported mode: " + columnIndex.getMode().mode);
+        }
+    }
+
+    public long add(DecoratedKey key, ByteBuffer value)
+    {
+        AbstractAnalyzer analyzer = columnIndex.getAnalyzer();
+        analyzer.reset(value.duplicate());
+
+        long size = 0;
+        while (analyzer.hasNext())
+        {
+            ByteBuffer term = analyzer.next();
+
+            if (term.remaining() >= OnDiskIndexBuilder.MAX_TERM_SIZE)
+            {
+                logger.info("Can't add term of column {} to index for key: {}, term size {} bytes, max allowed size {} bytes, use analyzed = true (if not yet set) for that column.",
+                            columnIndex.getColumnName(),
+                            keyValidator.getString(key.getKey()),
+                            term.remaining(),
+                            OnDiskIndexBuilder.MAX_TERM_SIZE);
+                continue;
+            }
+
+            size += index.add(columnIndex.getValidator().getString(term), key);
+        }
+
+        return size;
+    }
+
+    public RangeIterator<Long, Token> search(Expression expression)
+    {
+        return index.search(expression);
+    }
+
+    private static abstract class ConcurrentTrie
+    {
+        public static final SizeEstimatingNodeFactory NODE_FACTORY = new SizeEstimatingNodeFactory();
+
+        protected final ColumnDefinition definition;
+
+        public ConcurrentTrie(ColumnDefinition column)
+        {
+            definition = column;
+        }
+
+        public long add(String value, DecoratedKey key)
+        {
+            long overhead = CSLM_OVERHEAD;
+            ConcurrentSkipListSet<DecoratedKey> keys = get(value);
+            if (keys == null)
+            {
+                ConcurrentSkipListSet<DecoratedKey> newKeys = new ConcurrentSkipListSet<>(DecoratedKey.comparator);
+                keys = putIfAbsent(value, newKeys);
+                if (keys == null)
+                {
+                    overhead += CSLM_OVERHEAD + value.length();
+                    keys = newKeys;
+                }
+            }
+
+            keys.add(key);
+
+            // get and reset new memory size allocated by current thread
+            overhead += NODE_FACTORY.currentUpdateSize();
+            NODE_FACTORY.reset();
+
+            return overhead;
+        }
+
+        public RangeIterator<Long, Token> search(Expression expression)
+        {
+            assert expression.getOp() == Expression.Op.EQ; // means that min == max
+
+            ByteBuffer prefix = expression.lower == null ? null : expression.lower.value;
+
+            Iterable<ConcurrentSkipListSet<DecoratedKey>> search = search(definition.cellValueType().getString(prefix));
+
+            RangeUnionIterator.Builder<Long, Token> builder = RangeUnionIterator.builder();
+            for (ConcurrentSkipListSet<DecoratedKey> keys : search)
+            {
+                if (!keys.isEmpty())
+                    builder.add(new KeyRangeIterator(keys));
+            }
+
+            return builder.build();
+        }
+
+        protected abstract ConcurrentSkipListSet<DecoratedKey> get(String value);
+        protected abstract Iterable<ConcurrentSkipListSet<DecoratedKey>> search(String value);
+        protected abstract ConcurrentSkipListSet<DecoratedKey> putIfAbsent(String value, ConcurrentSkipListSet<DecoratedKey> key);
+    }
+
+    protected static class ConcurrentPrefixTrie extends ConcurrentTrie
+    {
+        private final ConcurrentRadixTree<ConcurrentSkipListSet<DecoratedKey>> trie;
+
+        private ConcurrentPrefixTrie(ColumnDefinition column)
+        {
+            super(column);
+            trie = new ConcurrentRadixTree<>(NODE_FACTORY);
+        }
+
+        public ConcurrentSkipListSet<DecoratedKey> get(String value)
+        {
+            return trie.getValueForExactKey(value);
+        }
+
+        public ConcurrentSkipListSet<DecoratedKey> putIfAbsent(String value, ConcurrentSkipListSet<DecoratedKey> newKeys)
+        {
+            return trie.putIfAbsent(value, newKeys);
+        }
+
+        public Iterable<ConcurrentSkipListSet<DecoratedKey>> search(String value)
+        {
+            return trie.getValuesForKeysStartingWith(value);
+        }
+    }
+
+    protected static class ConcurrentSuffixTrie extends ConcurrentTrie
+    {
+        private final ConcurrentSuffixTree<ConcurrentSkipListSet<DecoratedKey>> trie;
+
+        private ConcurrentSuffixTrie(ColumnDefinition column)
+        {
+            super(column);
+            trie = new ConcurrentSuffixTree<>(NODE_FACTORY);
+        }
+
+        public ConcurrentSkipListSet<DecoratedKey> get(String value)
+        {
+            return trie.getValueForExactKey(value);
+        }
+
+        public ConcurrentSkipListSet<DecoratedKey> putIfAbsent(String value, ConcurrentSkipListSet<DecoratedKey> newKeys)
+        {
+            return trie.putIfAbsent(value, newKeys);
+        }
+
+        public Iterable<ConcurrentSkipListSet<DecoratedKey>> search(String value)
+        {
+            return trie.getValuesForKeysContaining(value);
+        }
+    }
+
+    // This relies on the fact that all of the tree updates are done under exclusive write lock,
+    // method would overestimate in certain circumstances e.g. when nodes are replaced in place,
+    // but it's still better comparing to underestimate since it gives more breathing room for other memory users.
+    private static class SizeEstimatingNodeFactory extends SmartArrayBasedNodeFactory
+    {
+        private final ThreadLocal<Long> updateSize = ThreadLocal.withInitial(() -> 0L);
+
+        public Node createNode(CharSequence edgeCharacters, Object value, List<Node> childNodes, boolean isRoot)
+        {
+            Node node = super.createNode(edgeCharacters, value, childNodes, isRoot);
+            updateSize.set(updateSize.get() + measure(node));
+            return node;
+        }
+
+        public long currentUpdateSize()
+        {
+            return updateSize.get();
+        }
+
+        public void reset()
+        {
+            updateSize.set(0L);
+        }
+
+        private long measure(Node node)
+        {
+            // node with max overhead is CharArrayNodeLeafWithValue = 24B
+            long overhead = 24;
+
+            // array of chars (2 bytes) + CharSequence overhead
+            overhead += 24 + node.getIncomingEdge().length() * 2;
+
+            if (node.getOutgoingEdges() != null)
+            {
+                // 16 bytes for AtomicReferenceArray
+                overhead += 16;
+                overhead += 24 * node.getOutgoingEdges().size();
+            }
+
+            return overhead;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/plan/Expression.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/plan/Expression.java b/src/java/org/apache/cassandra/index/sasi/plan/Expression.java
new file mode 100644
index 0000000..e215ec7
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/plan/Expression.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.plan;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.Operator;
+import org.apache.cassandra.index.sasi.analyzer.AbstractAnalyzer;
+import org.apache.cassandra.index.sasi.conf.ColumnIndex;
+import org.apache.cassandra.index.sasi.disk.OnDiskIndex;
+import org.apache.cassandra.index.sasi.utils.TypeUtil;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Iterators;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Expression
+{
+    private static final Logger logger = LoggerFactory.getLogger(Expression.class);
+
+    public enum Op
+    {
+        EQ, NOT_EQ, RANGE
+    }
+
+    private final QueryController controller;
+
+    public final AbstractAnalyzer analyzer;
+
+    public final ColumnIndex index;
+    public final AbstractType<?> validator;
+    public final boolean isLiteral;
+
+    @VisibleForTesting
+    protected Op operation;
+
+    public Bound lower, upper;
+    public List<ByteBuffer> exclusions = new ArrayList<>();
+
+    public Expression(Expression other)
+    {
+        this(other.controller, other.index);
+        operation = other.operation;
+    }
+
+    public Expression(QueryController controller, ColumnIndex columnIndex)
+    {
+        this.controller = controller;
+        this.index = columnIndex;
+        this.analyzer = columnIndex.getAnalyzer();
+        this.validator = columnIndex.getValidator();
+        this.isLiteral = columnIndex.isLiteral();
+    }
+
+    @VisibleForTesting
+    public Expression(String name, AbstractType<?> validator)
+    {
+        this(null, new ColumnIndex(UTF8Type.instance, ColumnDefinition.regularDef("sasi", "internal", name, validator), null));
+    }
+
+    public Expression setLower(Bound newLower)
+    {
+        lower = newLower == null ? null : new Bound(newLower.value, newLower.inclusive);
+        return this;
+    }
+
+    public Expression setUpper(Bound newUpper)
+    {
+        upper = newUpper == null ? null : new Bound(newUpper.value, newUpper.inclusive);
+        return this;
+    }
+
+    public Expression setOp(Op op)
+    {
+        this.operation = op;
+        return this;
+    }
+
+    public Expression add(Operator op, ByteBuffer value)
+    {
+        boolean lowerInclusive = false, upperInclusive = false;
+        switch (op)
+        {
+            case EQ:
+                lower = new Bound(value, true);
+                upper = lower;
+                operation = Op.EQ;
+                break;
+
+            case NEQ:
+                // index expressions are priority sorted
+                // and NOT_EQ is the lowest priority, which means that operation type
+                // is always going to be set before reaching it in case of RANGE or EQ.
+                if (operation == null)
+                {
+                    operation = Op.NOT_EQ;
+                    lower = new Bound(value, true);
+                    upper = lower;
+                }
+                else
+                    exclusions.add(value);
+                break;
+
+            case LTE:
+                upperInclusive = true;
+            case LT:
+                operation = Op.RANGE;
+                upper = new Bound(value, upperInclusive);
+                break;
+
+            case GTE:
+                lowerInclusive = true;
+            case GT:
+                operation = Op.RANGE;
+                lower = new Bound(value, lowerInclusive);
+                break;
+        }
+
+        return this;
+    }
+
+    public Expression addExclusion(ByteBuffer value)
+    {
+        exclusions.add(value);
+        return this;
+    }
+
+    public boolean contains(ByteBuffer value)
+    {
+        if (!TypeUtil.isValid(value, validator))
+        {
+            int size = value.remaining();
+            if ((value = TypeUtil.tryUpcast(value, validator)) == null)
+            {
+                logger.error("Can't cast value for {} to size accepted by {}, value size is {} bytes.",
+                             index.getColumnName(),
+                             validator,
+                             size);
+                return false;
+            }
+        }
+
+        if (lower != null)
+        {
+            // suffix check
+            if (isLiteral)
+            {
+                if (!validateStringValue(value, lower.value))
+                    return false;
+            }
+            else
+            {
+                // range or (not-)equals - (mainly) for numeric values
+                int cmp = validator.compare(lower.value, value);
+
+                // in case of (NOT_)EQ lower == upper
+                if (operation == Op.EQ || operation == Op.NOT_EQ)
+                    return cmp == 0;
+
+                if (cmp > 0 || (cmp == 0 && !lower.inclusive))
+                    return false;
+            }
+        }
+
+        if (upper != null && lower != upper)
+        {
+            // string (prefix or suffix) check
+            if (isLiteral)
+            {
+                if (!validateStringValue(value, upper.value))
+                    return false;
+            }
+            else
+            {
+                // range - mainly for numeric values
+                int cmp = validator.compare(upper.value, value);
+                if (cmp < 0 || (cmp == 0 && !upper.inclusive))
+                    return false;
+            }
+        }
+
+        // as a last step let's check exclusions for the given field,
+        // this covers EQ/RANGE with exclusions.
+        for (ByteBuffer term : exclusions)
+        {
+            if (isLiteral && validateStringValue(value, term))
+                return false;
+            else if (validator.compare(term, value) == 0)
+                return false;
+        }
+
+        return true;
+    }
+
+    private boolean validateStringValue(ByteBuffer columnValue, ByteBuffer requestedValue)
+    {
+        analyzer.reset(columnValue.duplicate());
+        while (analyzer.hasNext())
+        {
+            ByteBuffer term = analyzer.next();
+            if (ByteBufferUtil.contains(term, requestedValue))
+                return true;
+        }
+
+        return false;
+    }
+
+    public Op getOp()
+    {
+        return operation;
+    }
+
+    public void checkpoint()
+    {
+        if (controller == null)
+            return;
+
+        controller.checkpoint();
+    }
+
+    public boolean hasLower()
+    {
+        return lower != null;
+    }
+
+    public boolean hasUpper()
+    {
+        return upper != null;
+    }
+
+    public boolean isLowerSatisfiedBy(OnDiskIndex.DataTerm term)
+    {
+        if (!hasLower())
+            return true;
+
+        int cmp = term.compareTo(validator, lower.value, false);
+        return cmp > 0 || cmp == 0 && lower.inclusive;
+    }
+
+    public boolean isUpperSatisfiedBy(OnDiskIndex.DataTerm term)
+    {
+        if (!hasUpper())
+            return true;
+
+        int cmp = term.compareTo(validator, upper.value, false);
+        return cmp < 0 || cmp == 0 && upper.inclusive;
+    }
+
+    public boolean isIndexed()
+    {
+        return index.isIndexed();
+    }
+
+    public String toString()
+    {
+        return String.format("Expression{name: %s, op: %s, lower: (%s, %s), upper: (%s, %s), exclusions: %s}",
+                             index.getColumnName(),
+                             operation,
+                             lower == null ? "null" : validator.getString(lower.value),
+                             lower != null && lower.inclusive,
+                             upper == null ? "null" : validator.getString(upper.value),
+                             upper != null && upper.inclusive,
+                             Iterators.toString(Iterators.transform(exclusions.iterator(), validator::getString)));
+    }
+
+    public int hashCode()
+    {
+        return new HashCodeBuilder().append(index.getColumnName())
+                                    .append(operation)
+                                    .append(validator)
+                                    .append(lower).append(upper)
+                                    .append(exclusions).build();
+    }
+
+    public boolean equals(Object other)
+    {
+        if (!(other instanceof Expression))
+            return false;
+
+        if (this == other)
+            return true;
+
+        Expression o = (Expression) other;
+
+        return Objects.equals(index.getColumnName(), o.index.getColumnName())
+                && validator.equals(o.validator)
+                && operation == o.operation
+                && Objects.equals(lower, o.lower)
+                && Objects.equals(upper, o.upper)
+                && exclusions.equals(o.exclusions);
+    }
+
+
+    public static class Bound
+    {
+        public final ByteBuffer value;
+        public final boolean inclusive;
+
+        public Bound(ByteBuffer value, boolean inclusive)
+        {
+            this.value = value;
+            this.inclusive = inclusive;
+        }
+
+        public boolean equals(Object other)
+        {
+            if (!(other instanceof Bound))
+                return false;
+
+            Bound o = (Bound) other;
+            return value.equals(o.value) && inclusive == o.inclusive;
+        }
+    }
+}