You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by xe...@apache.org on 2016/11/17 23:23:48 UTC

[3/3] cassandra git commit: Revert "Add row offset support to SASI"

Revert "Add row offset support to SASI"

This reverts commit 7d857b46fb070548bf5e5f6ff81db588f08ec22a.


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/490c1c27
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/490c1c27
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/490c1c27

Branch: refs/heads/cassandra-3.X
Commit: 490c1c27c9b700f14212d9591a516ddb8d0865c7
Parents: a1eef56
Author: Pavel Yaskevich <xe...@apache.org>
Authored: Thu Nov 17 15:20:04 2016 -0800
Committer: Pavel Yaskevich <xe...@apache.org>
Committed: Thu Nov 17 15:20:04 2016 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 -
 .../org/apache/cassandra/db/ColumnIndex.java    |   6 +-
 .../apache/cassandra/index/sasi/KeyFetcher.java |  98 -------
 .../apache/cassandra/index/sasi/SASIIndex.java  |  11 +-
 .../cassandra/index/sasi/SASIIndexBuilder.java  |  13 +-
 .../cassandra/index/sasi/SSTableIndex.java      |  41 ++-
 .../cassandra/index/sasi/conf/ColumnIndex.java  |   4 +-
 .../index/sasi/conf/view/RangeTermTree.java     |   4 -
 .../sasi/disk/AbstractTokenTreeBuilder.java     | 276 ++++++++----------
 .../cassandra/index/sasi/disk/Descriptor.java   |  33 +--
 .../sasi/disk/DynamicTokenTreeBuilder.java      |  59 ++--
 .../cassandra/index/sasi/disk/KeyOffsets.java   | 115 --------
 .../cassandra/index/sasi/disk/OnDiskIndex.java  |  12 +-
 .../index/sasi/disk/OnDiskIndexBuilder.java     |  16 +-
 .../index/sasi/disk/PerSSTableIndexWriter.java  |  13 +-
 .../cassandra/index/sasi/disk/RowKey.java       | 108 -------
 .../index/sasi/disk/StaticTokenTreeBuilder.java |  18 +-
 .../apache/cassandra/index/sasi/disk/Token.java |   9 +-
 .../cassandra/index/sasi/disk/TokenTree.java    | 288 +++++++------------
 .../index/sasi/disk/TokenTreeBuilder.java       |  72 ++---
 .../index/sasi/memory/IndexMemtable.java        |   8 +-
 .../index/sasi/memory/KeyRangeIterator.java     |  49 ++--
 .../cassandra/index/sasi/memory/MemIndex.java   |   4 +-
 .../index/sasi/memory/SkipListMemIndex.java     |  12 +-
 .../index/sasi/memory/TrieMemIndex.java         |  45 +--
 .../index/sasi/plan/QueryController.java        |  49 ++--
 .../cassandra/index/sasi/plan/QueryPlan.java    | 174 +++--------
 .../io/sstable/format/SSTableFlushObserver.java |   5 -
 .../io/sstable/format/SSTableReader.java        |  33 +--
 .../io/sstable/format/big/BigTableWriter.java   |   8 +-
 .../org/apache/cassandra/utils/obs/BitUtil.java |   2 +-
 test/data/legacy-sasi/on-disk-sa-int2.db        | Bin 12312 -> 0 bytes
 .../cassandra/index/sasi/SASIIndexTest.java     |  25 +-
 .../index/sasi/disk/KeyOffsetsTest.java         |  48 ----
 .../index/sasi/disk/OnDiskIndexTest.java        | 216 +++++++-------
 .../sasi/disk/PerSSTableIndexWriterTest.java    | 112 ++------
 .../index/sasi/disk/TokenTreeTest.java          | 208 +++++++-------
 .../index/sasi/plan/OperationTest.java          |   2 +-
 .../index/sasi/utils/KeyConverter.java          |  69 -----
 .../index/sasi/utils/LongIterator.java          |   8 +-
 .../sasi/utils/RangeUnionIteratorTest.java      |  17 --
 41 files changed, 745 insertions(+), 1546 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index db06341..6ca26f9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -45,7 +45,6 @@
  * Allow filtering on partition key columns for queries without secondary indexes (CASSANDRA-11031)
  * Fix Cassandra Stress reporting thread model and precision (CASSANDRA-12585)
  * Add JMH benchmarks.jar (CASSANDRA-12586)
- * Add row offset support to SASI (CASSANDRA-11990)
  * Cleanup uses of AlterTableStatementColumn (CASSANDRA-12567)
  * Add keep-alive to streaming (CASSANDRA-11841)
  * Tracing payload is passed through newSession(..) (CASSANDRA-11706)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java
index 8ea1272..de1b1df 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -121,10 +121,9 @@ public class ColumnIndex
         {
             Row staticRow = iterator.staticRow();
 
-            long startPosition = currentPosition();
             UnfilteredSerializer.serializer.serializeStaticRow(staticRow, header, writer, version);
             if (!observers.isEmpty())
-                observers.forEach((o) -> o.nextUnfilteredCluster(staticRow, startPosition));
+                observers.forEach((o) -> o.nextUnfilteredCluster(staticRow));
         }
     }
 
@@ -235,7 +234,6 @@ public class ColumnIndex
 
     private void add(Unfiltered unfiltered) throws IOException
     {
-        final long origPos = writer.position();
         long pos = currentPosition();
 
         if (firstClustering == null)
@@ -249,7 +247,7 @@ public class ColumnIndex
 
         // notify observers about each new row
         if (!observers.isEmpty())
-            observers.forEach((o) -> o.nextUnfilteredCluster(unfiltered, origPos));
+            observers.forEach((o) -> o.nextUnfilteredCluster(unfiltered));
 
         lastClustering = unfiltered.clustering();
         previousRowStart = pos;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/KeyFetcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/KeyFetcher.java b/src/java/org/apache/cassandra/index/sasi/KeyFetcher.java
deleted file mode 100644
index 80ee167..0000000
--- a/src/java/org/apache/cassandra/index/sasi/KeyFetcher.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.index.sasi;
-
-import java.io.IOException;
-
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.index.sasi.disk.*;
-import org.apache.cassandra.io.*;
-import org.apache.cassandra.io.sstable.format.*;
-
-
-public interface KeyFetcher
-{
-    public Clustering getClustering(long offset);
-    public DecoratedKey getPartitionKey(long offset);
-
-    public RowKey getRowKey(long partitionOffset, long rowOffset);
-
-    /**
-     * Fetches clustering and partition key from the sstable.
-     *
-     * Currently, clustering key is fetched from the data file of the sstable and partition key is
-     * read from the index file. Reading from index file helps us to warm up key cache in this case.
-     */
-    public static class SSTableKeyFetcher implements KeyFetcher
-    {
-        private final SSTableReader sstable;
-
-        public SSTableKeyFetcher(SSTableReader reader)
-        {
-            sstable = reader;
-        }
-
-        @Override
-        public Clustering getClustering(long offset)
-        {
-            try
-            {
-                return sstable.clusteringAt(offset);
-            }
-            catch (IOException e)
-            {
-                throw new FSReadError(new IOException("Failed to read clustering from " + sstable.descriptor, e), sstable.getFilename());
-            }
-        }
-
-        @Override
-        public DecoratedKey getPartitionKey(long offset)
-        {
-            try
-            {
-                return sstable.keyAt(offset);
-            }
-            catch (IOException e)
-            {
-                throw new FSReadError(new IOException("Failed to read key from " + sstable.descriptor, e), sstable.getFilename());
-            }
-        }
-
-        @Override
-        public RowKey getRowKey(long partitionOffset, long rowOffset)
-        {
-            if (rowOffset == KeyOffsets.NO_OFFSET)
-                return new RowKey(getPartitionKey(partitionOffset), null, sstable.metadata.comparator);
-            else
-                return new RowKey(getPartitionKey(partitionOffset), getClustering(rowOffset), sstable.metadata.comparator);
-        }
-
-        public int hashCode()
-        {
-            return sstable.descriptor.hashCode();
-        }
-
-        public boolean equals(Object other)
-        {
-            return other instanceof SSTableKeyFetcher
-                   && sstable.descriptor.equals(((SSTableKeyFetcher) other).sstable.descriptor);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/SASIIndex.java b/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
index 65953a9..4375964 100644
--- a/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
+++ b/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
@@ -46,7 +46,6 @@ import org.apache.cassandra.index.sasi.conf.ColumnIndex;
 import org.apache.cassandra.index.sasi.conf.IndexMode;
 import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder.Mode;
 import org.apache.cassandra.index.sasi.disk.PerSSTableIndexWriter;
-import org.apache.cassandra.index.sasi.disk.RowKey;
 import org.apache.cassandra.index.sasi.plan.QueryPlan;
 import org.apache.cassandra.index.transactions.IndexTransaction;
 import org.apache.cassandra.io.sstable.Descriptor;
@@ -183,14 +182,6 @@ public class SASIIndex implements Index, INotificationConsumer
         return getTruncateTask(FBUtilities.timestampMicros());
     }
 
-    public Callable<?> getTruncateTask(Collection<SSTableReader> sstablesToRebuild)
-    {
-        return () -> {
-            index.dropData(sstablesToRebuild);
-            return null;
-        };
-    }
-
     public Callable<?> getTruncateTask(long truncatedAt)
     {
         return () -> {
@@ -261,7 +252,7 @@ public class SASIIndex implements Index, INotificationConsumer
             public void insertRow(Row row)
             {
                 if (isNewData())
-                    adjustMemtableSize(index.index(new RowKey(key, row.clustering(), baseCfs.getComparator()), row), opGroup);
+                    adjustMemtableSize(index.index(key, row), opGroup);
             }
 
             public void updateRow(Row oldRow, Row newRow)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java b/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java
index d6706ea..d50875a 100644
--- a/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java
+++ b/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java
@@ -94,23 +94,16 @@ class SASIIndexBuilder extends SecondaryIndexBuilder
                         {
                             RowIndexEntry indexEntry = sstable.getPosition(key, SSTableReader.Operator.EQ);
                             dataFile.seek(indexEntry.position);
-                            int staticOffset = ByteBufferUtil.readWithShortLength(dataFile).remaining(); // key
+                            ByteBufferUtil.readWithShortLength(dataFile); // key
 
                             try (SSTableIdentityIterator partition = SSTableIdentityIterator.create(sstable, dataFile, key))
                             {
                                 // if the row has statics attached, it has to be indexed separately
                                 if (cfs.metadata.hasStaticColumns())
-                                {
-                                    long staticPosition = indexEntry.position + staticOffset;
-                                    indexWriter.nextUnfilteredCluster(partition.staticRow(), staticPosition);
-                                }
+                                    indexWriter.nextUnfilteredCluster(partition.staticRow());
 
-                                long position = dataFile.getPosition();
                                 while (partition.hasNext())
-                                {
-                                    indexWriter.nextUnfilteredCluster(partition.next(), position);
-                                    position = dataFile.getPosition();
-                                }
+                                    indexWriter.nextUnfilteredCluster(partition.next());
                             }
                         }
                         catch (IOException ex)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/SSTableIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/SSTableIndex.java b/src/java/org/apache/cassandra/index/sasi/SSTableIndex.java
index f9c8abf..c67c39c 100644
--- a/src/java/org/apache/cassandra/index/sasi/SSTableIndex.java
+++ b/src/java/org/apache/cassandra/index/sasi/SSTableIndex.java
@@ -18,22 +18,28 @@
 package org.apache.cassandra.index.sasi;
 
 import java.io.File;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.index.sasi.conf.ColumnIndex;
-import org.apache.cassandra.index.sasi.disk.*;
+import org.apache.cassandra.index.sasi.disk.OnDiskIndex;
+import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder;
 import org.apache.cassandra.index.sasi.disk.Token;
 import org.apache.cassandra.index.sasi.plan.Expression;
 import org.apache.cassandra.index.sasi.utils.RangeIterator;
+import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.concurrent.Ref;
 
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 
+import com.google.common.base.Function;
+
 public class SSTableIndex
 {
     private final ColumnIndex columnIndex;
@@ -59,7 +65,7 @@ public class SSTableIndex
                 sstable.getFilename(),
                 columnIndex.getIndexName());
 
-        this.index = new OnDiskIndex(indexFile, validator, new KeyFetcher.SSTableKeyFetcher(sstable));
+        this.index = new OnDiskIndex(indexFile, validator, new DecoratedKeyFetcher(sstable));
     }
 
     public OnDiskIndexBuilder.Mode mode()
@@ -157,5 +163,36 @@ public class SSTableIndex
         return String.format("SSTableIndex(column: %s, SSTable: %s)", columnIndex.getColumnName(), sstable.descriptor);
     }
 
+    private static class DecoratedKeyFetcher implements Function<Long, DecoratedKey>
+    {
+        private final SSTableReader sstable;
+
+        DecoratedKeyFetcher(SSTableReader reader)
+        {
+            sstable = reader;
+        }
+
+        public DecoratedKey apply(Long offset)
+        {
+            try
+            {
+                return sstable.keyAt(offset);
+            }
+            catch (IOException e)
+            {
+                throw new FSReadError(new IOException("Failed to read key from " + sstable.descriptor, e), sstable.getFilename());
+            }
+        }
+
+        public int hashCode()
+        {
+            return sstable.descriptor.hashCode();
+        }
 
+        public boolean equals(Object other)
+        {
+            return other instanceof DecoratedKeyFetcher
+                    && sstable.descriptor.equals(((DecoratedKeyFetcher) other).sstable.descriptor);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java b/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java
index 459e5c3..0958113 100644
--- a/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java
@@ -30,6 +30,7 @@ import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.Operator;
+import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Memtable;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.AsciiType;
@@ -39,7 +40,6 @@ import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.index.sasi.analyzer.AbstractAnalyzer;
 import org.apache.cassandra.index.sasi.conf.view.View;
 import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder;
-import org.apache.cassandra.index.sasi.disk.RowKey;
 import org.apache.cassandra.index.sasi.disk.Token;
 import org.apache.cassandra.index.sasi.memory.IndexMemtable;
 import org.apache.cassandra.index.sasi.plan.Expression;
@@ -99,7 +99,7 @@ public class ColumnIndex
         return keyValidator;
     }
 
-    public long index(RowKey key, Row row)
+    public long index(DecoratedKey key, Row row)
     {
         return getCurrentMemtable().index(key, getValueOf(column, row, FBUtilities.nowInSeconds()));
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/conf/view/RangeTermTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/conf/view/RangeTermTree.java b/src/java/org/apache/cassandra/index/sasi/conf/view/RangeTermTree.java
index 2775c29..d6b4551 100644
--- a/src/java/org/apache/cassandra/index/sasi/conf/view/RangeTermTree.java
+++ b/src/java/org/apache/cassandra/index/sasi/conf/view/RangeTermTree.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.index.sasi.conf.view;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -47,9 +46,6 @@ public class RangeTermTree implements TermTree
 
     public Set<SSTableIndex> search(Expression e)
     {
-        if (e == null)
-            return Collections.emptySet();
-
         ByteBuffer minTerm = e.lower == null ? min : e.lower.value;
         ByteBuffer maxTerm = e.upper == null ? max : e.upper.value;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/disk/AbstractTokenTreeBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/AbstractTokenTreeBuilder.java b/src/java/org/apache/cassandra/index/sasi/disk/AbstractTokenTreeBuilder.java
index 9245960..18994de 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/AbstractTokenTreeBuilder.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/AbstractTokenTreeBuilder.java
@@ -20,18 +20,19 @@ package org.apache.cassandra.index.sasi.disk;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.function.*;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
 
-import com.carrotsearch.hppc.LongArrayList;
-import com.carrotsearch.hppc.cursors.LongCursor;
-import com.carrotsearch.hppc.cursors.LongObjectCursor;
-import org.apache.cassandra.dht.*;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.AbstractIterator;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 
+import com.carrotsearch.hppc.LongArrayList;
+import com.carrotsearch.hppc.LongSet;
+import com.carrotsearch.hppc.cursors.LongCursor;
+
 public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
 {
     protected int numBlocks;
@@ -64,7 +65,7 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
     public int serializedSize()
     {
         if (numBlocks == 1)
-            return BLOCK_HEADER_BYTES + ((int) tokenCount * LEAF_ENTRY_BYTES);
+            return (BLOCK_HEADER_BYTES + ((int) tokenCount * 16));
         else
             return numBlocks * BLOCK_BYTES;
     }
@@ -111,15 +112,6 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
         buffer.clear();
     }
 
-    /**
-     * Tree node,
-     *
-     * B+-tree consists of root, interior nodes and leaves. Root can be either a node or a leaf.
-     *
-     * Depending on the concrete implementation of {@code TokenTreeBuilder}
-     * leaf can be partial or static (in case of {@code StaticTokenTreeBuilder} or dynamic in case
-     * of {@code DynamicTokenTreeBuilder}
-     */
     protected abstract class Node
     {
         protected InteriorNode parent;
@@ -187,16 +179,8 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
             alignBuffer(buf, BLOCK_HEADER_BYTES);
         }
 
-        /**
-         * Shared header part, written for all node types:
-         *     [  info byte  ] [  token count   ] [ min node token ] [ max node token ]
-         *     [      1b     ] [    2b (short)  ] [   8b (long)    ] [    8b (long)   ]
-         **/
         private abstract class Header
         {
-            /**
-             * Serializes the shared part of the header
-             */
             public void serialize(ByteBuffer buf)
             {
                 buf.put(infoByte())
@@ -208,12 +192,6 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
             protected abstract byte infoByte();
         }
 
-        /**
-         * In addition to shared header part, root header stores version information,
-         * overall token count and min/max tokens for the whole tree:
-         *     [      magic    ] [  overall token count  ] [ min tree token ] [ max tree token ]
-         *     [   2b (short)  ] [         8b (long)     ] [   8b (long)    ] [    8b (long)   ]
-         */
         private class RootHeader extends Header
         {
             public void serialize(ByteBuffer buf)
@@ -229,21 +207,19 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
             {
                 // if leaf, set leaf indicator and last leaf indicator (bits 0 & 1)
                 // if not leaf, clear both bits
-                return isLeaf() ? ENTRY_TYPE_MASK : 0;
+                return (byte) ((isLeaf()) ? 3 : 0);
             }
 
             protected void writeMagic(ByteBuffer buf)
             {
                 switch (Descriptor.CURRENT_VERSION)
                 {
-                    case ab:
+                    case Descriptor.VERSION_AB:
                         buf.putShort(AB_MAGIC);
                         break;
-                    case ac:
-                        buf.putShort(AC_MAGIC);
-                        break;
+
                     default:
-                        throw new RuntimeException("Unsupported version");
+                        break;
                 }
 
             }
@@ -273,12 +249,6 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
 
     }
 
-    /**
-     * Leaf consists of
-     *   - header (format described in {@code Header} )
-     *   - data (format described in {@code LeafEntry})
-     *   - overflow collision entries, that hold {@value OVERFLOW_TRAILER_CAPACITY} of {@code RowOffset}.
-     */
     protected abstract class Leaf extends Node
     {
         protected LongArrayList overflowCollisions;
@@ -309,98 +279,82 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
 
         protected abstract void serializeData(ByteBuffer buf);
 
-        protected LeafEntry createEntry(final long tok, final KeyOffsets offsets)
+        protected LeafEntry createEntry(final long tok, final LongSet offsets)
         {
-            LongArrayList rawOffsets = new LongArrayList(offsets.size());
-
-            offsets.forEach(new Consumer<LongObjectCursor<long[]>>()
-            {
-                public void accept(LongObjectCursor<long[]> cursor)
-                {
-                    for (long l : cursor.value)
-                    {
-                        rawOffsets.add(cursor.key);
-                        rawOffsets.add(l);
-                    }
-                }
-            });
-
-            int offsetCount = rawOffsets.size();
+            int offsetCount = offsets.size();
             switch (offsetCount)
             {
                 case 0:
                     throw new AssertionError("no offsets for token " + tok);
+                case 1:
+                    long offset = offsets.toArray()[0];
+                    if (offset > MAX_OFFSET)
+                        throw new AssertionError("offset " + offset + " cannot be greater than " + MAX_OFFSET);
+                    else if (offset <= Integer.MAX_VALUE)
+                        return new SimpleLeafEntry(tok, offset);
+                    else
+                        return new FactoredOffsetLeafEntry(tok, offset);
                 case 2:
-                    return new SimpleLeafEntry(tok, rawOffsets.get(0), rawOffsets.get(1));
+                    long[] rawOffsets = offsets.toArray();
+                    if (rawOffsets[0] <= Integer.MAX_VALUE && rawOffsets[1] <= Integer.MAX_VALUE &&
+                        (rawOffsets[0] <= Short.MAX_VALUE || rawOffsets[1] <= Short.MAX_VALUE))
+                        return new PackedCollisionLeafEntry(tok, rawOffsets);
+                    else
+                        return createOverflowEntry(tok, offsetCount, offsets);
                 default:
-                    assert offsetCount % 2 == 0;
-                    if (offsetCount == 4)
-                    {
-                        if (rawOffsets.get(0) < Integer.MAX_VALUE && rawOffsets.get(1) < Integer.MAX_VALUE &&
-                            rawOffsets.get(2) < Integer.MAX_VALUE && rawOffsets.get(3) < Integer.MAX_VALUE)
-                        {
-                            return new PackedCollisionLeafEntry(tok, (int)rawOffsets.get(0), (int) rawOffsets.get(1),
-                                                                (int) rawOffsets.get(2), (int) rawOffsets.get(3));
-                        }
-                    }
-                    return createOverflowEntry(tok, offsetCount, rawOffsets);
+                    return createOverflowEntry(tok, offsetCount, offsets);
             }
         }
 
-        private LeafEntry createOverflowEntry(final long tok, final int offsetCount, final LongArrayList offsets)
+        private LeafEntry createOverflowEntry(final long tok, final int offsetCount, final LongSet offsets)
         {
             if (overflowCollisions == null)
-                overflowCollisions = new LongArrayList(offsetCount);
-
-            int overflowCount = (overflowCollisions.size() + offsetCount) / 2;
-            if (overflowCount >= OVERFLOW_TRAILER_CAPACITY)
-                throw new AssertionError("cannot have more than " + OVERFLOW_TRAILER_CAPACITY + " overflow collisions per leaf, but had: " + overflowCount);
+                overflowCollisions = new LongArrayList();
 
-            LeafEntry entry = new OverflowCollisionLeafEntry(tok, (short) (overflowCollisions.size() / 2), (short) (offsetCount / 2));
-            overflowCollisions.addAll(offsets);
+            LeafEntry entry = new OverflowCollisionLeafEntry(tok, (short) overflowCollisions.size(), (short) offsetCount);
+            for (LongCursor o : offsets)
+            {
+                if (overflowCollisions.size() == OVERFLOW_TRAILER_CAPACITY)
+                    throw new AssertionError("cannot have more than " + OVERFLOW_TRAILER_CAPACITY + " overflow collisions per leaf");
+                else
+                    overflowCollisions.add(o.value);
+            }
             return entry;
         }
 
-        /**
-         * A leaf of the B+-Tree, that holds information about the row offset(s) for
-         * the current token.
-         *
-         * Main 3 types of leaf entries are:
-         *   1) simple leaf entry: holding just a single row offset
-         *   2) packed collision leaf entry: holding two entries that would fit together into 168 bytes
-         *   3) overflow entry: only holds offset in overflow trailer and amount of entries belonging to this leaf
-         */
         protected abstract class LeafEntry
         {
             protected final long token;
 
             abstract public EntryType type();
+            abstract public int offsetData();
+            abstract public short offsetExtra();
 
             public LeafEntry(final long tok)
             {
                 token = tok;
             }
 
-            public abstract void serialize(ByteBuffer buf);
+            public void serialize(ByteBuffer buf)
+            {
+                buf.putShort((short) type().ordinal())
+                   .putShort(offsetExtra())
+                   .putLong(token)
+                   .putInt(offsetData());
+            }
 
         }
 
-        /**
-         * Simple leaf, that can store a single row offset, having the following format:
-         *
-         *     [    type    ] [   token   ] [ partition offset ] [ row offset ]
-         *     [ 2b (short) ] [ 8b (long) ] [     8b (long)    ] [  8b (long) ]
-         */
+
+        // assumes there is a single offset and the offset is <= Integer.MAX_VALUE
         protected class SimpleLeafEntry extends LeafEntry
         {
-            private final long partitionOffset;
-            private final long rowOffset;
+            private final long offset;
 
-            public SimpleLeafEntry(final long tok, final long partitionOffset, final long rowOffset)
+            public SimpleLeafEntry(final long tok, final long off)
             {
                 super(tok);
-                this.partitionOffset = partitionOffset;
-                this.rowOffset = rowOffset;
+                offset = off;
             }
 
             public EntryType type()
@@ -408,38 +362,61 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
                 return EntryType.SIMPLE;
             }
 
-            @Override
-            public void serialize(ByteBuffer buf)
+            public int offsetData()
             {
-                buf.putShort((short) type().ordinal())
-                   .putLong(token)
-                   .putLong(partitionOffset)
-                   .putLong(rowOffset);
+                return (int) offset;
+            }
+
+            public short offsetExtra()
+            {
+                return 0;
             }
         }
 
-        /**
-         * Packed collision entry, can store two offsets, if each one of their positions
-         * fit into 4 bytes.
-         *     [    type    ] [   token   ] [ partition offset 1 ] [ row offset  1] [ partition offset 1 ] [ row offset  1]
-         *     [ 2b (short) ] [ 8b (long) ] [      4b (int)      ] [    4b (int)  ] [      4b (int)      ] [    4b (int)  ]
-         */
-        protected class PackedCollisionLeafEntry extends LeafEntry
+        // assumes there is a single offset and Integer.MAX_VALUE < offset <= MAX_OFFSET
+        // take the middle 32 bits of offset (or the top 32 when considering offset is max 48 bits)
+        // and store where offset is normally stored. take bottom 16 bits of offset and store in entry header
+        private class FactoredOffsetLeafEntry extends LeafEntry
         {
-            private final int partitionOffset1;
-            private final int rowOffset1;
-            private final int partitionOffset2;
-            private final int rowOffset2;
+            private final long offset;
 
-            public PackedCollisionLeafEntry(final long tok, final int partitionOffset1, final int rowOffset1,
-                                            final int partitionOffset2, final int rowOffset2)
+            public FactoredOffsetLeafEntry(final long tok, final long off)
             {
                 super(tok);
-                this.partitionOffset1 = partitionOffset1;
-                this.rowOffset1 = rowOffset1;
-                this.partitionOffset2 = partitionOffset2;
-                this.rowOffset2 = rowOffset2;
+                offset = off;
+            }
 
+            public EntryType type()
+            {
+                return EntryType.FACTORED;
+            }
+
+            public int offsetData()
+            {
+                return (int) (offset >>> Short.SIZE);
+            }
+
+            public short offsetExtra()
+            {
+                // exta offset is supposed to be an unsigned 16-bit integer
+                return (short) offset;
+            }
+        }
+
+        // holds an entry with two offsets that can be packed in an int & a short
+        // the int offset is stored where offset is normally stored. short offset is
+        // stored in entry header
+        private class PackedCollisionLeafEntry extends LeafEntry
+        {
+            private short smallerOffset;
+            private int largerOffset;
+
+            public PackedCollisionLeafEntry(final long tok, final long[] offs)
+            {
+                super(tok);
+
+                smallerOffset = (short) Math.min(offs[0], offs[1]);
+                largerOffset = (int) Math.max(offs[0], offs[1]);
             }
 
             public EntryType type()
@@ -447,27 +424,21 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
                 return EntryType.PACKED;
             }
 
-            @Override
-            public void serialize(ByteBuffer buf)
+            public int offsetData()
             {
-                buf.putShort((short) type().ordinal())
-                   .putLong(token)
-                   .putInt(partitionOffset1)
-                   .putInt(rowOffset1)
-                   .putInt(partitionOffset2)
-                   .putInt(rowOffset2);
-            }
-        }
-
-        /**
-         * Overflow collision entry, holds an entry with three or more offsets, or two offsets
-         * that cannot be packed into 16 bytes.
-         *     [    type    ] [   token   ] [ start index ] [    count    ]
-         *     [ 2b (short) ] [ 8b (long) ] [   8b (long) ] [  8b (long)  ]
-         *
-         *   - [ start index ] is a position of first item belonging to this leaf entry in the overflow trailer
-         *   - [ count ] is the amount of items belonging to this leaf entry that are stored in the overflow trailer
-         */
+                return largerOffset;
+            }
+
+            public short offsetExtra()
+            {
+                return smallerOffset;
+            }
+        }
+
+        // holds an entry with three or more offsets, or two offsets that cannot
+        // be packed into an int & a short. the index into the overflow list
+        // is stored where the offset is normally stored. the number of overflowed offsets
+        // for the entry is stored in the entry header
         private class OverflowCollisionLeafEntry extends LeafEntry
         {
             private final short startIndex;
@@ -485,23 +456,20 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
                 return EntryType.OVERFLOW;
             }
 
-            @Override
-            public void serialize(ByteBuffer buf)
+            public int offsetData()
             {
-                buf.putShort((short) type().ordinal())
-                   .putLong(token)
-                   .putLong(startIndex)
-                   .putLong(count);
+                return startIndex;
             }
+
+            public short offsetExtra()
+            {
+                return count;
+            }
+
         }
+
     }
 
-    /**
-     * Interior node consists of:
-     *    - (interior node) header
-     *    - tokens (serialized as longs, with count stored in header)
-     *    - child offsets
-     */
     protected class InteriorNode extends Node
     {
         protected List<Long> tokens = new ArrayList<>(TOKENS_PER_BLOCK);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/disk/Descriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/Descriptor.java b/src/java/org/apache/cassandra/index/sasi/disk/Descriptor.java
index 3fa0f06..3aa6f14 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/Descriptor.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/Descriptor.java
@@ -22,29 +22,30 @@ package org.apache.cassandra.index.sasi.disk;
  */
 public class Descriptor
 {
-    public static enum Version
+    public static final String VERSION_AA = "aa";
+    public static final String VERSION_AB = "ab";
+    public static final String CURRENT_VERSION = VERSION_AB;
+    public static final Descriptor CURRENT = new Descriptor(CURRENT_VERSION);
+
+    public static class Version
     {
-        aa,
-        ab,
-        ac
-    }
+        public final String version;
 
-    public static final Version VERSION_AA = Version.aa;
-    public static final Version VERSION_AB = Version.ab;
-    public static final Version VERSION_AC = Version.ac;
+        public Version(String version)
+        {
+            this.version = version;
+        }
 
-    public static final Version CURRENT_VERSION = Version.ac;
-    public static final Descriptor CURRENT = new Descriptor(CURRENT_VERSION);
+        public String toString()
+        {
+            return version;
+        }
+    }
 
     public final Version version;
 
     public Descriptor(String v)
     {
-        this.version = Version.valueOf(v);
-    }
-
-    public Descriptor(Version v)
-    {
-        this.version = v;
+        this.version = new Version(v);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/disk/DynamicTokenTreeBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/DynamicTokenTreeBuilder.java b/src/java/org/apache/cassandra/index/sasi/disk/DynamicTokenTreeBuilder.java
index 6e3e163..2ddfd89 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/DynamicTokenTreeBuilder.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/DynamicTokenTreeBuilder.java
@@ -20,14 +20,17 @@ package org.apache.cassandra.index.sasi.disk;
 import java.nio.ByteBuffer;
 import java.util.*;
 
-import com.carrotsearch.hppc.cursors.LongObjectCursor;
-import org.apache.cassandra.dht.*;
-import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.AbstractIterator;
+import org.apache.cassandra.utils.Pair;
+
+import com.carrotsearch.hppc.LongOpenHashSet;
+import com.carrotsearch.hppc.LongSet;
+import com.carrotsearch.hppc.cursors.LongCursor;
 
 public class DynamicTokenTreeBuilder extends AbstractTokenTreeBuilder
 {
+    private final SortedMap<Long, LongSet> tokens = new TreeMap<>();
 
-    private final SortedMap<Long, KeyOffsets> tokens = new TreeMap<>();
 
     public DynamicTokenTreeBuilder()
     {}
@@ -37,52 +40,54 @@ public class DynamicTokenTreeBuilder extends AbstractTokenTreeBuilder
         add(data);
     }
 
-    public DynamicTokenTreeBuilder(SortedMap<Long, KeyOffsets> data)
+    public DynamicTokenTreeBuilder(SortedMap<Long, LongSet> data)
     {
         add(data);
     }
 
-    public void add(Long token, long partitionOffset, long rowOffset)
+    public void add(Long token, long keyPosition)
     {
-        KeyOffsets found = tokens.get(token);
+        LongSet found = tokens.get(token);
         if (found == null)
-            tokens.put(token, (found = new KeyOffsets(2)));
+            tokens.put(token, (found = new LongOpenHashSet(2)));
 
-        found.put(partitionOffset, rowOffset);
+        found.add(keyPosition);
     }
 
-    public void add(Iterator<Pair<Long, KeyOffsets>> data)
+    public void add(Iterator<Pair<Long, LongSet>> data)
     {
         while (data.hasNext())
         {
-            Pair<Long, KeyOffsets> entry = data.next();
-            for (LongObjectCursor<long[]> cursor : entry.right)
-                for (long l : cursor.value)
-                    add(entry.left, cursor.key, l);
+            Pair<Long, LongSet> entry = data.next();
+            for (LongCursor l : entry.right)
+                add(entry.left, l.value);
         }
     }
 
-    public void add(SortedMap<Long, KeyOffsets> data)
+    public void add(SortedMap<Long, LongSet> data)
     {
-        for (Map.Entry<Long, KeyOffsets> newEntry : data.entrySet())
+        for (Map.Entry<Long, LongSet> newEntry : data.entrySet())
         {
-            for (LongObjectCursor<long[]> cursor : newEntry.getValue())
-                for (long l : cursor.value)
-                    add(newEntry.getKey(), cursor.key, l);
+            LongSet found = tokens.get(newEntry.getKey());
+            if (found == null)
+                tokens.put(newEntry.getKey(), (found = new LongOpenHashSet(4)));
+
+            for (LongCursor offset : newEntry.getValue())
+                found.add(offset.value);
         }
     }
 
-    public Iterator<Pair<Long, KeyOffsets>> iterator()
+    public Iterator<Pair<Long, LongSet>> iterator()
     {
-        final Iterator<Map.Entry<Long, KeyOffsets>> iterator = tokens.entrySet().iterator();
-        return new AbstractIterator<Pair<Long, KeyOffsets>>()
+        final Iterator<Map.Entry<Long, LongSet>> iterator = tokens.entrySet().iterator();
+        return new AbstractIterator<Pair<Long, LongSet>>()
         {
-            protected Pair<Long, KeyOffsets> computeNext()
+            protected Pair<Long, LongSet> computeNext()
             {
                 if (!iterator.hasNext())
                     return endOfData();
 
-                Map.Entry<Long, KeyOffsets> entry = iterator.next();
+                Map.Entry<Long, LongSet> entry = iterator.next();
                 return Pair.create(entry.getKey(), entry.getValue());
             }
         };
@@ -156,9 +161,9 @@ public class DynamicTokenTreeBuilder extends AbstractTokenTreeBuilder
 
     private class DynamicLeaf extends Leaf
     {
-        private final SortedMap<Long, KeyOffsets> tokens;
+        private final SortedMap<Long, LongSet> tokens;
 
-        DynamicLeaf(SortedMap<Long, KeyOffsets> data)
+        DynamicLeaf(SortedMap<Long, LongSet> data)
         {
             super(data.firstKey(), data.lastKey());
             tokens = data;
@@ -176,7 +181,7 @@ public class DynamicTokenTreeBuilder extends AbstractTokenTreeBuilder
 
         protected void serializeData(ByteBuffer buf)
         {
-            for (Map.Entry<Long, KeyOffsets> entry : tokens.entrySet())
+            for (Map.Entry<Long, LongSet> entry : tokens.entrySet())
                 createEntry(entry.getKey(), entry.getValue()).serialize(buf);
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/disk/KeyOffsets.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/KeyOffsets.java b/src/java/org/apache/cassandra/index/sasi/disk/KeyOffsets.java
deleted file mode 100644
index db849fe..0000000
--- a/src/java/org/apache/cassandra/index/sasi/disk/KeyOffsets.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.index.sasi.disk;
-
-import java.util.*;
-
-import org.apache.commons.lang3.ArrayUtils;
-
-import com.carrotsearch.hppc.LongObjectOpenHashMap;
-import com.carrotsearch.hppc.cursors.LongObjectCursor;
-
-public class KeyOffsets extends LongObjectOpenHashMap<long[]>
-{
-    public static final long NO_OFFSET = Long.MIN_VALUE;
-
-    public KeyOffsets() {
-        super(4);
-    }
-
-    public KeyOffsets(int initialCapacity) {
-        super(initialCapacity);
-    }
-
-    public void put(long currentPartitionOffset, long currentRowOffset)
-    {
-        if (containsKey(currentPartitionOffset))
-            super.put(currentPartitionOffset, append(get(currentPartitionOffset), currentRowOffset));
-        else
-            super.put(currentPartitionOffset, asArray(currentRowOffset));
-    }
-
-    public long[] put(long currentPartitionOffset, long[] currentRowOffset)
-    {
-        if (containsKey(currentPartitionOffset))
-            return super.put(currentPartitionOffset, merge(get(currentPartitionOffset), currentRowOffset));
-        else
-            return super.put(currentPartitionOffset, currentRowOffset);
-    }
-
-    public boolean equals(Object obj)
-    {
-        if (!(obj instanceof KeyOffsets))
-            return false;
-
-        KeyOffsets other = (KeyOffsets) obj;
-        if (other.size() != this.size())
-            return false;
-
-        for (LongObjectCursor<long[]> cursor : this)
-            if (!Arrays.equals(cursor.value, other.get(cursor.key)))
-                return false;
-
-        return true;
-    }
-
-    @Override
-    public String toString()
-    {
-        StringBuilder sb = new StringBuilder("KeyOffsets { ");
-        forEach((a, b) -> {
-            sb.append(a).append(": ").append(Arrays.toString(b));
-        });
-        sb.append(" }");
-        return sb.toString();
-    }
-
-    // primitive array creation
-    public static long[] asArray(long... vals)
-    {
-        return vals;
-    }
-
-    private static long[] merge(long[] arr1, long[] arr2)
-    {
-        long[] copy = new long[arr2.length];
-        int written = 0;
-        for (long l : arr2)
-        {
-            if (!ArrayUtils.contains(arr1, l))
-                copy[written++] = l;
-        }
-
-        if (written == 0)
-            return arr1;
-
-        long[] merged = new long[arr1.length + written];
-        System.arraycopy(arr1, 0, merged, 0, arr1.length);
-        System.arraycopy(copy, 0, merged, arr1.length, written);
-        return merged;
-    }
-
-    private static long[] append(long[] arr1, long v)
-    {
-        if (ArrayUtils.contains(arr1, v))
-            return arr1;
-        else
-            return ArrayUtils.add(arr1, v);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndex.java b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndex.java
index 70d24a7..4d43cd9 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndex.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndex.java
@@ -22,7 +22,8 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.stream.Collectors;
 
-import org.apache.cassandra.index.sasi.*;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.index.sasi.Term;
 import org.apache.cassandra.index.sasi.plan.Expression;
 import org.apache.cassandra.index.sasi.plan.Expression.Op;
 import org.apache.cassandra.index.sasi.utils.MappedBuffer;
@@ -36,12 +37,12 @@ import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 
+import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.PeekingIterator;
 
 import static org.apache.cassandra.index.sasi.disk.OnDiskBlock.SearchResult;
-import static org.apache.cassandra.index.sasi.disk.TokenTreeBuilder.TOKEN_BYTES;
 
 public class OnDiskIndex implements Iterable<OnDiskIndex.DataTerm>, Closeable
 {
@@ -105,7 +106,7 @@ public class OnDiskIndex implements Iterable<OnDiskIndex.DataTerm>, Closeable
     protected final long indexSize;
     protected final boolean hasMarkedPartials;
 
-    protected final KeyFetcher keyFetcher;
+    protected final Function<Long, DecoratedKey> keyFetcher;
 
     protected final String indexPath;
 
@@ -115,7 +116,7 @@ public class OnDiskIndex implements Iterable<OnDiskIndex.DataTerm>, Closeable
     protected final ByteBuffer minTerm, maxTerm, minKey, maxKey;
 
     @SuppressWarnings("resource")
-    public OnDiskIndex(File index, AbstractType<?> cmp, KeyFetcher keyReader)
+    public OnDiskIndex(File index, AbstractType<?> cmp, Function<Long, DecoratedKey> keyReader)
     {
         keyFetcher = keyReader;
 
@@ -634,7 +635,6 @@ public class OnDiskIndex implements Iterable<OnDiskIndex.DataTerm>, Closeable
         {
             final long blockEnd = FBUtilities.align(content.position(), OnDiskIndexBuilder.BLOCK_SIZE);
 
-            // ([int] -1 for sparse, offset for non-sparse)
             if (isSparse())
                 return new PrefetchedTokensIterator(getSparseTokens());
 
@@ -658,7 +658,7 @@ public class OnDiskIndex implements Iterable<OnDiskIndex.DataTerm>, Closeable
             NavigableMap<Long, Token> individualTokens = new TreeMap<>();
             for (int i = 0; i < size; i++)
             {
-                Token token = perBlockIndex.get(content.getLong(ptrOffset + 1 + TOKEN_BYTES * i), keyFetcher);
+                Token token = perBlockIndex.get(content.getLong(ptrOffset + 1 + (8 * i)), keyFetcher);
 
                 assert token != null;
                 individualTokens.put(token.get(), token);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java
index b6e2da5..4946f06 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java
@@ -23,7 +23,6 @@ import java.nio.ByteBuffer;
 import java.util.*;
 
 import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.dht.*;
 import org.apache.cassandra.index.sasi.plan.Expression.Op;
 import org.apache.cassandra.index.sasi.sa.IndexedTerm;
 import org.apache.cassandra.index.sasi.sa.IntegralSA;
@@ -38,6 +37,7 @@ import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 
 import com.carrotsearch.hppc.LongArrayList;
+import com.carrotsearch.hppc.LongSet;
 import com.carrotsearch.hppc.ShortArrayList;
 import com.google.common.annotations.VisibleForTesting;
 
@@ -163,7 +163,7 @@ public class OnDiskIndexBuilder
         this.marksPartials = marksPartials;
     }
 
-    public OnDiskIndexBuilder add(ByteBuffer term, DecoratedKey key, long partitionOffset, long rowOffset)
+    public OnDiskIndexBuilder add(ByteBuffer term, DecoratedKey key, long keyPosition)
     {
         if (term.remaining() >= MAX_TERM_SIZE)
         {
@@ -183,16 +183,16 @@ public class OnDiskIndexBuilder
             estimatedBytes += 64 + 48 + term.remaining();
         }
 
-        tokens.add((Long) key.getToken().getTokenValue(), partitionOffset, rowOffset);
+        tokens.add((Long) key.getToken().getTokenValue(), keyPosition);
 
         // calculate key range (based on actual key values) for current index
         minKey = (minKey == null || keyComparator.compare(minKey, key.getKey()) > 0) ? key.getKey() : minKey;
         maxKey = (maxKey == null || keyComparator.compare(maxKey, key.getKey()) < 0) ? key.getKey() : maxKey;
 
-        // 84 ((boolean(1)*4) + (long(8)*4) + 24 + 24) bytes for the LongObjectOpenHashMap<long[]> created
-        // when the keyPosition was added + 40 bytes for the TreeMap.Entry + 8 bytes for the token (key).
+        // 60 ((boolean(1)*4) + (long(8)*4) + 24) bytes for the LongOpenHashSet created when the keyPosition was added
+        // + 40 bytes for the TreeMap.Entry + 8 bytes for the token (key).
         // in the case of hash collision for the token we may overestimate but this is extremely rare
-        estimatedBytes += 84 + 40 + 8;
+        estimatedBytes += 60 + 40 + 8;
 
         return this;
     }
@@ -569,7 +569,7 @@ public class OnDiskIndexBuilder
         }
     }
 
-    private class MutableDataBlock extends MutableBlock<InMemoryDataTerm>
+    private static class MutableDataBlock extends MutableBlock<InMemoryDataTerm>
     {
         private static final int MAX_KEYS_SPARSE = 5;
 
@@ -651,7 +651,7 @@ public class OnDiskIndexBuilder
         {
             term.serialize(buffer);
             buffer.writeByte((byte) keys.getTokenCount());
-            for (Pair<Long, KeyOffsets> key : keys)
+            for (Pair<Long, LongSet> key : keys)
                 buffer.writeLong(key.left);
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java b/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java
index c204883..9fa4e87 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java
@@ -109,7 +109,7 @@ public class PerSSTableIndexWriter implements SSTableFlushObserver
         currentKeyPosition = curPosition;
     }
 
-    public void nextUnfilteredCluster(Unfiltered unfiltered, long currentRowOffset)
+    public void nextUnfilteredCluster(Unfiltered unfiltered)
     {
         if (!unfiltered.isRow())
             return;
@@ -129,15 +129,10 @@ public class PerSSTableIndexWriter implements SSTableFlushObserver
             if (index == null)
                 indexes.put(column, (index = newIndex(columnIndex)));
 
-            index.add(value.duplicate(), currentKey, currentKeyPosition, currentRowOffset);
+            index.add(value.duplicate(), currentKey, currentKeyPosition);
         });
     }
 
-    public void nextUnfilteredCluster(Unfiltered unfilteredCluster)
-    {
-        throw new UnsupportedOperationException("SASI Index does not support direct row access.");
-    }
-
     public void complete()
     {
         if (isComplete)
@@ -202,7 +197,7 @@ public class PerSSTableIndexWriter implements SSTableFlushObserver
             this.currentBuilder = newIndexBuilder();
         }
 
-        public void add(ByteBuffer term, DecoratedKey key, long partitoinOffset, long rowOffset)
+        public void add(ByteBuffer term, DecoratedKey key, long keyPosition)
         {
             if (term.remaining() == 0)
                 return;
@@ -240,7 +235,7 @@ public class PerSSTableIndexWriter implements SSTableFlushObserver
                     }
                 }
 
-                currentBuilder.add(token, key, partitoinOffset, rowOffset);
+                currentBuilder.add(token, key, keyPosition);
                 isAdded = true;
             }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/disk/RowKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/RowKey.java b/src/java/org/apache/cassandra/index/sasi/disk/RowKey.java
deleted file mode 100644
index fc5a2c0..0000000
--- a/src/java/org/apache/cassandra/index/sasi/disk/RowKey.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyclustering ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.index.sasi.disk;
-
-import java.util.*;
-import java.util.stream.*;
-
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.utils.*;
-
-/**
- * Primary key of the found row, a combination of the Partition Key
- * and clustering that belongs to the row.
- */
-public class RowKey implements Comparable<RowKey>
-{
-
-    public final DecoratedKey decoratedKey;
-    public final Clustering clustering;
-
-    private final ClusteringComparator comparator;
-
-    public RowKey(DecoratedKey primaryKey, Clustering clustering, ClusteringComparator comparator)
-    {
-        this.decoratedKey = primaryKey;
-        this.clustering = clustering;
-        this.comparator = comparator;
-    }
-
-    public boolean equals(Object o)
-    {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-
-        RowKey rowKey = (RowKey) o;
-
-        if (decoratedKey != null ? !decoratedKey.equals(rowKey.decoratedKey) : rowKey.decoratedKey != null)
-            return false;
-        return clustering != null ? clustering.equals(rowKey.clustering) : rowKey.clustering == null;
-    }
-
-    public int hashCode()
-    {
-        return new HashCodeBuilder().append(decoratedKey).append(clustering).toHashCode();
-    }
-
-    public int compareTo(RowKey other)
-    {
-        int cmp = this.decoratedKey.compareTo(other.decoratedKey);
-        if (cmp == 0 && clustering != null)
-        {
-            // Both clustering and rows should match
-            if (clustering.kind() == ClusteringPrefix.Kind.STATIC_CLUSTERING || other.clustering.kind() == ClusteringPrefix.Kind.STATIC_CLUSTERING)
-                return 0;
-
-            return comparator.compare(this.clustering, other.clustering);
-        }
-        else
-        {
-            return cmp;
-        }
-    }
-
-    public static RowKeyComparator COMPARATOR = new RowKeyComparator();
-
-    public String toString(CFMetaData metadata)
-    {
-        return String.format("RowKey: { pk : %s, clustering: %s}",
-                             metadata.getKeyValidator().getString(decoratedKey.getKey()),
-                             clustering.toString(metadata));
-    }
-
-    @Override
-    public String toString()
-    {
-        return String.format("RowKey: { pk : %s, clustering: %s}",
-                             ByteBufferUtil.bytesToHex(decoratedKey.getKey()),
-                             String.join(",", Arrays.stream(clustering.getRawValues()).map(ByteBufferUtil::bytesToHex).collect(Collectors.toList())));
-    }
-
-    private static class RowKeyComparator implements Comparator<RowKey>
-    {
-        public int compare(RowKey o1, RowKey o2)
-        {
-            return o1.compareTo(o2);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/disk/StaticTokenTreeBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/StaticTokenTreeBuilder.java b/src/java/org/apache/cassandra/index/sasi/disk/StaticTokenTreeBuilder.java
index 8a11d60..6e64c56 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/StaticTokenTreeBuilder.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/StaticTokenTreeBuilder.java
@@ -19,7 +19,8 @@ package org.apache.cassandra.index.sasi.disk;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.Iterator;
+import java.util.SortedMap;
 
 import org.apache.cassandra.index.sasi.utils.CombinedTerm;
 import org.apache.cassandra.index.sasi.utils.RangeIterator;
@@ -27,6 +28,7 @@ import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.AbstractIterator;
 import org.apache.cassandra.utils.Pair;
 
+import com.carrotsearch.hppc.LongSet;
 import com.google.common.collect.Iterators;
 
 /**
@@ -61,17 +63,17 @@ public class StaticTokenTreeBuilder extends AbstractTokenTreeBuilder
         combinedTerm = term;
     }
 
-    public void add(Long token, long partitionOffset, long rowOffset)
+    public void add(Long token, long keyPosition)
     {
         throw new UnsupportedOperationException();
     }
 
-    public void add(SortedMap<Long, KeyOffsets> data)
+    public void add(SortedMap<Long, LongSet> data)
     {
         throw new UnsupportedOperationException();
     }
 
-    public void add(Iterator<Pair<Long, KeyOffsets>> data)
+    public void add(Iterator<Pair<Long, LongSet>> data)
     {
         throw new UnsupportedOperationException();
     }
@@ -81,12 +83,12 @@ public class StaticTokenTreeBuilder extends AbstractTokenTreeBuilder
         return tokenCount == 0;
     }
 
-    public Iterator<Pair<Long, KeyOffsets>> iterator()
+    public Iterator<Pair<Long, LongSet>> iterator()
     {
-        @SuppressWarnings("resource") Iterator<Token> iterator = combinedTerm.getTokenIterator();
-        return new AbstractIterator<Pair<Long, KeyOffsets>>()
+        Iterator<Token> iterator = combinedTerm.getTokenIterator();
+        return new AbstractIterator<Pair<Long, LongSet>>()
         {
-            protected Pair<Long, KeyOffsets> computeNext()
+            protected Pair<Long, LongSet> computeNext()
             {
                 if (!iterator.hasNext())
                     return endOfData();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/disk/Token.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/Token.java b/src/java/org/apache/cassandra/index/sasi/disk/Token.java
index 8ea864f..4cd1ea3 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/Token.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/Token.java
@@ -19,9 +19,12 @@ package org.apache.cassandra.index.sasi.disk;
 
 import com.google.common.primitives.Longs;
 
-import org.apache.cassandra.index.sasi.utils.*;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.index.sasi.utils.CombinedValue;
 
-public abstract class Token implements CombinedValue<Long>, Iterable<RowKey>
+import com.carrotsearch.hppc.LongSet;
+
+public abstract class Token implements CombinedValue<Long>, Iterable<DecoratedKey>
 {
     protected final long token;
 
@@ -35,7 +38,7 @@ public abstract class Token implements CombinedValue<Long>, Iterable<RowKey>
         return token;
     }
 
-    public abstract KeyOffsets getOffsets();
+    public abstract LongSet getOffsets();
 
     public int compareTo(CombinedValue<Long> o)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java b/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java
index 1969627..c69ce00 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java
@@ -19,21 +19,22 @@ package org.apache.cassandra.index.sasi.disk;
 
 import java.io.IOException;
 import java.util.*;
-import java.util.stream.*;
 
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.index.sasi.utils.AbstractIterator;
+import org.apache.cassandra.index.sasi.utils.CombinedValue;
+import org.apache.cassandra.index.sasi.utils.MappedBuffer;
+import org.apache.cassandra.index.sasi.utils.RangeIterator;
+import org.apache.cassandra.utils.MergeIterator;
+
+import com.carrotsearch.hppc.LongOpenHashSet;
+import com.carrotsearch.hppc.LongSet;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
 import com.google.common.collect.Iterators;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 
-import com.carrotsearch.hppc.cursors.LongObjectCursor;
-import org.apache.cassandra.index.sasi.*;
-import org.apache.cassandra.index.sasi.disk.Descriptor.*;
-import org.apache.cassandra.index.sasi.utils.AbstractIterator;
-import org.apache.cassandra.index.sasi.utils.*;
-import org.apache.cassandra.utils.*;
-
-import static org.apache.cassandra.index.sasi.disk.Descriptor.Version.*;
-import static org.apache.cassandra.index.sasi.disk.TokenTreeBuilder.*;
+import static org.apache.cassandra.index.sasi.disk.TokenTreeBuilder.EntryType;
 
 // Note: all of the seek-able offsets contained in TokenTree should be sizeof(long)
 // even if currently only lower int portion of them if used, because that makes
@@ -41,6 +42,9 @@ import static org.apache.cassandra.index.sasi.disk.TokenTreeBuilder.*;
 // without any on-disk format changes and/or re-indexing if one day we'll have a need to.
 public class TokenTree
 {
+    private static final int LONG_BYTES = Long.SIZE / 8;
+    private static final int SHORT_BYTES = Short.SIZE / 8;
+
     private final Descriptor descriptor;
     private final MappedBuffer file;
     private final long startPos;
@@ -62,7 +66,8 @@ public class TokenTree
 
         file.position(startPos + TokenTreeBuilder.SHARED_HEADER_BYTES);
 
-        validateMagic();
+        if (!validateMagic())
+            throw new IllegalArgumentException("invalid token tree");
 
         tokenCount = file.getLong();
         treeMinToken = file.getLong();
@@ -74,12 +79,12 @@ public class TokenTree
         return tokenCount;
     }
 
-    public RangeIterator<Long, Token> iterator(KeyFetcher keyFetcher)
+    public RangeIterator<Long, Token> iterator(Function<Long, DecoratedKey> keyFetcher)
     {
         return new TokenTreeIterator(file.duplicate(), keyFetcher);
     }
 
-    public OnDiskToken get(final long searchToken, KeyFetcher keyFetcher)
+    public OnDiskToken get(final long searchToken, Function<Long, DecoratedKey> keyFetcher)
     {
         seekToLeaf(searchToken, file);
         long leafStart = file.position();
@@ -90,24 +95,21 @@ public class TokenTree
 
         file.position(leafStart + TokenTreeBuilder.BLOCK_HEADER_BYTES);
 
-        OnDiskToken token = getTokenAt(file, tokenIndex, leafSize, keyFetcher);
-
+        OnDiskToken token = OnDiskToken.getTokenAt(file, tokenIndex, leafSize, keyFetcher);
         return token.get().equals(searchToken) ? token : null;
     }
 
-    private void validateMagic()
+    private boolean validateMagic()
     {
-        if (descriptor.version == aa)
-            return;
-
-        short magic = file.getShort();
-        if (descriptor.version == Version.ab && magic == TokenTreeBuilder.AB_MAGIC)
-            return;
-
-        if (descriptor.version == Version.ac && magic == TokenTreeBuilder.AC_MAGIC)
-            return;
-
-        throw new IllegalArgumentException("invalid token tree. Written magic: '" + ByteBufferUtil.bytesToHex(ByteBufferUtil.bytes(magic)) + "'");
+        switch (descriptor.version.toString())
+        {
+            case Descriptor.VERSION_AA:
+                return true;
+            case Descriptor.VERSION_AB:
+                return TokenTreeBuilder.AB_MAGIC == file.getShort();
+            default:
+                return false;
+        }
     }
 
     // finds leaf that *could* contain token
@@ -134,16 +136,18 @@ public class TokenTree
             long minToken = file.getLong();
             long maxToken = file.getLong();
 
-            long seekBase = blockStart + BLOCK_HEADER_BYTES;
+            long seekBase = blockStart + TokenTreeBuilder.BLOCK_HEADER_BYTES;
             if (minToken > token)
             {
                 // seek to beginning of child offsets to locate first child
-                file.position(seekBase + tokenCount * TOKEN_BYTES);
+                file.position(seekBase + tokenCount * LONG_BYTES);
+                blockStart = (startPos + (int) file.getLong());
             }
             else if (maxToken < token)
             {
                 // seek to end of child offsets to locate last child
-                file.position(seekBase + (2 * tokenCount) * TOKEN_BYTES);
+                file.position(seekBase + (2 * tokenCount) * LONG_BYTES);
+                blockStart = (startPos + (int) file.getLong());
             }
             else
             {
@@ -154,11 +158,12 @@ public class TokenTree
 
                 // file pointer is now at beginning of offsets
                 if (offsetIndex == tokenCount)
-                    file.position(file.position() + (offsetIndex * TOKEN_BYTES));
+                    file.position(file.position() + (offsetIndex * LONG_BYTES));
                 else
-                    file.position(file.position() + ((tokenCount - offsetIndex - 1) + offsetIndex) * TOKEN_BYTES);
+                    file.position(file.position() + ((tokenCount - offsetIndex - 1) + offsetIndex) * LONG_BYTES);
+
+                blockStart = (startPos + (int) file.getLong());
             }
-            blockStart = (startPos + (int) file.getLong());
         }
     }
 
@@ -167,7 +172,8 @@ public class TokenTree
         short offsetIndex = 0;
         for (int i = 0; i < tokenCount; i++)
         {
-            if (searchToken < file.getLong())
+            long readToken = file.getLong();
+            if (searchToken < readToken)
                 break;
 
             offsetIndex++;
@@ -187,7 +193,10 @@ public class TokenTree
         while (start <= end)
         {
             middle = start + ((end - start) >> 1);
-            long token = file.getLong(base + middle * LEAF_ENTRY_BYTES + (descriptor.version.compareTo(Version.ac) < 0 ? LEGACY_TOKEN_OFFSET_BYTES : TOKEN_OFFSET_BYTES));
+
+            // each entry is 16 bytes wide, token is in bytes 4-11
+            long token = file.getLong(base + (middle * (2 * LONG_BYTES) + 4));
+
             if (token == searchToken)
                 break;
 
@@ -200,9 +209,9 @@ public class TokenTree
         return (short) middle;
     }
 
-    private class TokenTreeIterator extends RangeIterator<Long, Token>
+    public class TokenTreeIterator extends RangeIterator<Long, Token>
     {
-        private final KeyFetcher keyFetcher;
+        private final Function<Long, DecoratedKey> keyFetcher;
         private final MappedBuffer file;
 
         private long currentLeafStart;
@@ -215,7 +224,7 @@ public class TokenTree
         protected boolean firstIteration = true;
         private boolean lastLeaf;
 
-        TokenTreeIterator(MappedBuffer file, KeyFetcher keyFetcher)
+        TokenTreeIterator(MappedBuffer file, Function<Long, DecoratedKey> keyFetcher)
         {
             super(treeMinToken, treeMaxToken, tokenCount);
 
@@ -305,13 +314,13 @@ public class TokenTree
 
         private Token getTokenAt(int idx)
         {
-            return TokenTree.this.getTokenAt(file, idx, leafSize, keyFetcher);
+            return OnDiskToken.getTokenAt(file, idx, leafSize, keyFetcher);
         }
 
         private long getTokenPosition(int idx)
         {
-            // skip entry header to get position pointing directly at the entry's token
-            return TokenTree.this.getEntryPosition(idx, file, descriptor) + (descriptor.version.compareTo(Version.ac) < 0 ? LEGACY_TOKEN_OFFSET_BYTES : TOKEN_OFFSET_BYTES);
+            // skip 4 byte entry header to get position pointing directly at the entry's token
+            return OnDiskToken.getEntryPosition(idx, file) + (2 * SHORT_BYTES);
         }
 
         private void seekToNextLeaf()
@@ -338,15 +347,15 @@ public class TokenTree
         }
     }
 
-    public class OnDiskToken extends Token
+    public static class OnDiskToken extends Token
     {
         private final Set<TokenInfo> info = new HashSet<>(2);
-        private final Set<RowKey> loadedKeys = new TreeSet<>(RowKey.COMPARATOR);
+        private final Set<DecoratedKey> loadedKeys = new TreeSet<>(DecoratedKey.comparator);
 
-        private OnDiskToken(MappedBuffer buffer, long position, short leafSize, KeyFetcher keyFetcher)
+        public OnDiskToken(MappedBuffer buffer, long position, short leafSize, Function<Long, DecoratedKey> keyFetcher)
         {
-            super(buffer.getLong(position + (descriptor.version.compareTo(Version.ac) < 0 ? LEGACY_TOKEN_OFFSET_BYTES : TOKEN_OFFSET_BYTES)));
-            info.add(new TokenInfo(buffer, position, leafSize, keyFetcher, descriptor));
+            super(buffer.getLong(position + (2 * SHORT_BYTES)));
+            info.add(new TokenInfo(buffer, position, leafSize, keyFetcher));
         }
 
         public void merge(CombinedValue<Long> other)
@@ -368,9 +377,9 @@ public class TokenTree
             }
         }
 
-        public Iterator<RowKey> iterator()
+        public Iterator<DecoratedKey> iterator()
         {
-            List<Iterator<RowKey>> keys = new ArrayList<>(info.size());
+            List<Iterator<DecoratedKey>> keys = new ArrayList<>(info.size());
 
             for (TokenInfo i : info)
                 keys.add(i.iterator());
@@ -378,72 +387,68 @@ public class TokenTree
             if (!loadedKeys.isEmpty())
                 keys.add(loadedKeys.iterator());
 
-            return MergeIterator.get(keys, RowKey.COMPARATOR, new MergeIterator.Reducer<RowKey, RowKey>()
+            return MergeIterator.get(keys, DecoratedKey.comparator, new MergeIterator.Reducer<DecoratedKey, DecoratedKey>()
             {
-                RowKey reduced = null;
+                DecoratedKey reduced = null;
 
                 public boolean trivialReduceIsTrivial()
                 {
                     return true;
                 }
 
-                public void reduce(int idx, RowKey current)
+                public void reduce(int idx, DecoratedKey current)
                 {
                     reduced = current;
                 }
 
-                protected RowKey getReduced()
+                protected DecoratedKey getReduced()
                 {
                     return reduced;
                 }
             });
         }
 
-        public KeyOffsets getOffsets()
+        public LongSet getOffsets()
         {
-            KeyOffsets offsets = new KeyOffsets();
+            LongSet offsets = new LongOpenHashSet(4);
             for (TokenInfo i : info)
             {
-                for (LongObjectCursor<long[]> offset : i.fetchOffsets())
-                    offsets.put(offset.key, offset.value);
+                for (long offset : i.fetchOffsets())
+                    offsets.add(offset);
             }
 
             return offsets;
         }
-    }
 
-    private OnDiskToken getTokenAt(MappedBuffer buffer, int idx, short leafSize, KeyFetcher keyFetcher)
-    {
-        return new OnDiskToken(buffer, getEntryPosition(idx, buffer, descriptor), leafSize, keyFetcher);
-    }
-
-    private long getEntryPosition(int idx, MappedBuffer file, Descriptor descriptor)
-    {
-        if (descriptor.version.compareTo(Version.ac) < 0)
-            return file.position() + (idx * LEGACY_LEAF_ENTRY_BYTES);
+        public static OnDiskToken getTokenAt(MappedBuffer buffer, int idx, short leafSize, Function<Long, DecoratedKey> keyFetcher)
+        {
+            return new OnDiskToken(buffer, getEntryPosition(idx, buffer), leafSize, keyFetcher);
+        }
 
-        // skip n entries, to the entry with the given index
-        return file.position() + (idx * LEAF_ENTRY_BYTES);
+        private static long getEntryPosition(int idx, MappedBuffer file)
+        {
+            // info (4 bytes) + token (8 bytes) + offset (4 bytes) = 16 bytes
+            return file.position() + (idx * (2 * LONG_BYTES));
+        }
     }
 
     private static class TokenInfo
     {
         private final MappedBuffer buffer;
-        private final KeyFetcher keyFetcher;
-        private final Descriptor descriptor;
+        private final Function<Long, DecoratedKey> keyFetcher;
+
         private final long position;
         private final short leafSize;
 
-        public TokenInfo(MappedBuffer buffer, long position, short leafSize, KeyFetcher keyFetcher, Descriptor descriptor)
+        public TokenInfo(MappedBuffer buffer, long position, short leafSize, Function<Long, DecoratedKey> keyFetcher)
         {
             this.keyFetcher = keyFetcher;
             this.buffer = buffer;
             this.position = position;
             this.leafSize = leafSize;
-            this.descriptor = descriptor;
         }
 
-        public Iterator<RowKey> iterator()
+        public Iterator<DecoratedKey> iterator()
         {
             return new KeyIterator(keyFetcher, fetchOffsets());
         }
@@ -460,154 +465,59 @@ public class TokenTree
 
             TokenInfo o = (TokenInfo) other;
             return keyFetcher == o.keyFetcher && position == o.position;
-
         }
 
-        /**
-         * Legacy leaf storage format (used for reading data formats before AC):
-         *
-         *    [(short) leaf type][(short) offset extra bytes][(long) token][(int) offsetData]
-         *
-         * Many pairs can be encoded into long+int.
-         *
-         * Simple entry: offset fits into (int)
-         *
-         *    [(short) leaf type][(short) offset extra bytes][(long) token][(int) offsetData]
-         *
-         * FactoredOffset: a single offset, offset fits into (long)+(int) bits:
-         *
-         *    [(short) leaf type][(short) 16 bytes of remained offset][(long) token][(int) top 32 bits of offset]
-         *
-         * PackedCollisionEntry: packs the two offset entries into int and a short (if both of them fit into
-         * (long) and one of them fits into (int))
-         *
-         *    [(short) leaf type][(short) 16 the offset that'd fit into short][(long) token][(int) 32 bits of offset that'd fit into int]
-         *
-         * Otherwise, the rest gets packed into limited-size overflow collision entry
-         *
-         *    [(short) leaf type][(short) count][(long) token][(int) start index]
-         */
-        private KeyOffsets fetchOffsetsLegacy()
+        private long[] fetchOffsets()
         {
             short info = buffer.getShort(position);
             // offset extra is unsigned short (right-most 16 bits of 48 bits allowed for an offset)
-            int offsetExtra = buffer.getShort(position + Short.BYTES) & 0xFFFF;
+            int offsetExtra = buffer.getShort(position + SHORT_BYTES) & 0xFFFF;
             // is the it left-most (32-bit) base of the actual offset in the index file
-            int offsetData = buffer.getInt(position + (2 * Short.BYTES) + Long.BYTES);
+            int offsetData = buffer.getInt(position + (2 * SHORT_BYTES) + LONG_BYTES);
 
             EntryType type = EntryType.of(info & TokenTreeBuilder.ENTRY_TYPE_MASK);
 
-            KeyOffsets rowOffsets = new KeyOffsets();
             switch (type)
             {
                 case SIMPLE:
-                    rowOffsets.put(offsetData, KeyOffsets.NO_OFFSET);
-                    break;
+                    return new long[] { offsetData };
+
                 case OVERFLOW:
-                    long offsetPos = (buffer.position() + (2 * (leafSize * Long.BYTES)) + (offsetData * Long.BYTES));
+                    long[] offsets = new long[offsetExtra]; // offsetShort contains count of tokens
+                    long offsetPos = (buffer.position() + (2 * (leafSize * LONG_BYTES)) + (offsetData * LONG_BYTES));
 
                     for (int i = 0; i < offsetExtra; i++)
-                    {
-                        long offset = buffer.getLong(offsetPos + (i * Long.BYTES));;
-                        rowOffsets.put(offset, KeyOffsets.NO_OFFSET);
-                    }
-                    break;
-                case FACTORED:
-                    long offset = (((long) offsetData) << Short.SIZE) + offsetExtra;
-                    rowOffsets.put(offset, KeyOffsets.NO_OFFSET);
-                    break;
-                case PACKED:
-                    rowOffsets.put(offsetExtra, KeyOffsets.NO_OFFSET);
-                    rowOffsets.put(offsetData, KeyOffsets.NO_OFFSET);
-                default:
-                    throw new IllegalStateException("Unknown entry type: " + type);
-            }
-            return rowOffsets;
-        }
+                        offsets[i] = buffer.getLong(offsetPos + (i * LONG_BYTES));
 
-        private KeyOffsets fetchOffsets()
-        {
-            if (descriptor.version.compareTo(Version.ac) < 0)
-                return fetchOffsetsLegacy();
-
-            short info = buffer.getShort(position);
-            EntryType type = EntryType.of(info & TokenTreeBuilder.ENTRY_TYPE_MASK);
+                    return offsets;
 
-            KeyOffsets rowOffsets = new KeyOffsets();
-            long baseOffset = position + LEAF_ENTRY_TYPE_BYTES + TOKEN_BYTES;
-            switch (type)
-            {
-                case SIMPLE:
-                    long partitionOffset = buffer.getLong(baseOffset);
-                    long rowOffset = buffer.getLong(baseOffset + LEAF_PARTITON_OFFSET_BYTES);
+                case FACTORED:
+                    return new long[] { (((long) offsetData) << Short.SIZE) + offsetExtra };
 
-                    rowOffsets.put(partitionOffset, rowOffset);
-                    break;
                 case PACKED:
-                    long partitionOffset1 = buffer.getInt(baseOffset);
-                    long rowOffset1 = buffer.getInt(baseOffset + LEAF_PARTITON_OFFSET_PACKED_BYTES);
-
-                    long partitionOffset2 = buffer.getInt(baseOffset + LEAF_PARTITON_OFFSET_PACKED_BYTES + LEAF_ROW_OFFSET_PACKED_BYTES);
-                    long rowOffset2 = buffer.getInt(baseOffset + 2 * LEAF_PARTITON_OFFSET_PACKED_BYTES + LEAF_ROW_OFFSET_PACKED_BYTES);
+                    return new long[] { offsetExtra, offsetData };
 
-                    rowOffsets.put(partitionOffset1, rowOffset1);
-                    rowOffsets.put(partitionOffset2, rowOffset2);
-                    break;
-                case OVERFLOW:
-                    long collisionOffset = buffer.getLong(baseOffset);
-                    long count = buffer.getLong(baseOffset + LEAF_PARTITON_OFFSET_BYTES);
-
-                    // Skip leaves and collision offsets that do not belong to current token
-                    long offsetPos = buffer.position() + leafSize *  LEAF_ENTRY_BYTES + collisionOffset * COLLISION_ENTRY_BYTES;
-
-                    for (int i = 0; i < count; i++)
-                    {
-                        long currentPartitionOffset = buffer.getLong(offsetPos + i * COLLISION_ENTRY_BYTES);
-                        long currentRowOffset = buffer.getLong(offsetPos + i * COLLISION_ENTRY_BYTES + LEAF_PARTITON_OFFSET_BYTES);
-
-                        rowOffsets.put(currentPartitionOffset, currentRowOffset);
-                    }
-                    break;
                 default:
                     throw new IllegalStateException("Unknown entry type: " + type);
             }
-
-
-            return rowOffsets;
         }
     }
 
-    private static class KeyIterator extends AbstractIterator<RowKey>
+    private static class KeyIterator extends AbstractIterator<DecoratedKey>
     {
-        private final KeyFetcher keyFetcher;
-        private final Iterator<LongObjectCursor<long[]>> offsets;
-        private long currentPatitionKey;
-        private PrimitiveIterator.OfLong currentCursor = null;
+        private final Function<Long, DecoratedKey> keyFetcher;
+        private final long[] offsets;
+        private int index = 0;
 
-        public KeyIterator(KeyFetcher keyFetcher, KeyOffsets offsets)
+        public KeyIterator(Function<Long, DecoratedKey> keyFetcher, long[] offsets)
         {
             this.keyFetcher = keyFetcher;
-            this.offsets = offsets.iterator();
+            this.offsets = offsets;
         }
 
-        public RowKey computeNext()
+        public DecoratedKey computeNext()
         {
-            if (currentCursor != null && currentCursor.hasNext())
-            {
-                return keyFetcher.getRowKey(currentPatitionKey, currentCursor.nextLong());
-            }
-            else if (offsets.hasNext())
-            {
-                LongObjectCursor<long[]> cursor = offsets.next();
-                currentPatitionKey = cursor.key;
-                currentCursor = LongStream.of(cursor.value).iterator();
-
-                return keyFetcher.getRowKey(currentPatitionKey, currentCursor.nextLong());
-            }
-            else
-            {
-                return endOfData();
-            }
+            return index < offsets.length ? keyFetcher.apply(offsets[index++]) : endOfData();
         }
     }
-}
+}
\ No newline at end of file