You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by xe...@apache.org on 2016/09/06 05:19:19 UTC

[3/3] cassandra git commit: Add row offset support to SASI

Add row offset support to SASI

Patch by Alex Petrov; reviewed by Pavel Yaskevich for CASSANDRA-11990


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7d857b46
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7d857b46
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7d857b46

Branch: refs/heads/trunk
Commit: 7d857b46fb070548bf5e5f6ff81db588f08ec22a
Parents: 3c95d47
Author: Alex Petrov <ol...@gmail.com>
Authored: Sun Jun 26 18:21:08 2016 +0200
Committer: Pavel Yaskevich <xe...@apache.org>
Committed: Mon Sep 5 22:17:11 2016 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/db/ColumnIndex.java    |   6 +-
 .../apache/cassandra/index/sasi/KeyFetcher.java |  98 +++++++
 .../apache/cassandra/index/sasi/SASIIndex.java  |  11 +-
 .../cassandra/index/sasi/SASIIndexBuilder.java  |  13 +-
 .../cassandra/index/sasi/SSTableIndex.java      |  41 +--
 .../cassandra/index/sasi/conf/ColumnIndex.java  |   4 +-
 .../index/sasi/conf/view/RangeTermTree.java     |   4 +
 .../sasi/disk/AbstractTokenTreeBuilder.java     | 276 ++++++++++--------
 .../cassandra/index/sasi/disk/Descriptor.java   |  33 ++-
 .../sasi/disk/DynamicTokenTreeBuilder.java      |  59 ++--
 .../cassandra/index/sasi/disk/KeyOffsets.java   | 115 ++++++++
 .../cassandra/index/sasi/disk/OnDiskIndex.java  |  12 +-
 .../index/sasi/disk/OnDiskIndexBuilder.java     |  16 +-
 .../index/sasi/disk/PerSSTableIndexWriter.java  |  13 +-
 .../cassandra/index/sasi/disk/RowKey.java       | 108 +++++++
 .../index/sasi/disk/StaticTokenTreeBuilder.java |  18 +-
 .../apache/cassandra/index/sasi/disk/Token.java |  10 +-
 .../cassandra/index/sasi/disk/TokenTree.java    | 288 ++++++++++++-------
 .../index/sasi/disk/TokenTreeBuilder.java       |  72 +++--
 .../index/sasi/memory/IndexMemtable.java        |   8 +-
 .../index/sasi/memory/KeyRangeIterator.java     |  49 ++--
 .../cassandra/index/sasi/memory/MemIndex.java   |   4 +-
 .../index/sasi/memory/SkipListMemIndex.java     |  12 +-
 .../index/sasi/memory/TrieMemIndex.java         |  45 ++-
 .../index/sasi/plan/QueryController.java        |  49 ++--
 .../cassandra/index/sasi/plan/QueryPlan.java    | 174 ++++++++---
 .../io/sstable/format/SSTableFlushObserver.java |   5 +
 .../io/sstable/format/SSTableReader.java        |  33 ++-
 .../io/sstable/format/big/BigTableWriter.java   |   8 +-
 .../org/apache/cassandra/utils/obs/BitUtil.java |   2 +-
 test/data/legacy-sasi/on-disk-sa-int2.db        | Bin 0 -> 12312 bytes
 .../cassandra/index/sasi/SASIIndexTest.java     |  25 +-
 .../index/sasi/disk/KeyOffsetsTest.java         |  48 ++++
 .../index/sasi/disk/OnDiskIndexTest.java        | 216 +++++++-------
 .../sasi/disk/PerSSTableIndexWriterTest.java    | 112 ++++++--
 .../index/sasi/disk/TokenTreeTest.java          | 208 +++++++-------
 .../index/sasi/plan/OperationTest.java          |   2 +-
 .../index/sasi/utils/KeyConverter.java          |  69 +++++
 .../index/sasi/utils/LongIterator.java          |   8 +-
 .../sasi/utils/RangeUnionIteratorTest.java      |  17 ++
 41 files changed, 1547 insertions(+), 745 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c466dfe..28c2d84 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.10
+ * Add row offset support to SASI (CASSANDRA-11990)
  * Cleanup uses of AlterTableStatementColumn (CASSANDRA-12567)
  * Add keep-alive to streaming (CASSANDRA-11841)
  * Tracing payload is passed through newSession(..) (CASSANDRA-11706)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java
index 1116cc2..1f39927 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -121,9 +121,10 @@ public class ColumnIndex
         {
             Row staticRow = iterator.staticRow();
 
+            long startPosition = currentPosition();
             UnfilteredSerializer.serializer.serializeStaticRow(staticRow, header, writer, version);
             if (!observers.isEmpty())
-                observers.forEach((o) -> o.nextUnfilteredCluster(staticRow));
+                observers.forEach((o) -> o.nextUnfilteredCluster(staticRow, startPosition));
         }
     }
 
@@ -232,6 +233,7 @@ public class ColumnIndex
 
     private void add(Unfiltered unfiltered) throws IOException
     {
+        final long origPos = writer.position();
         long pos = currentPosition();
 
         if (firstClustering == null)
@@ -245,7 +247,7 @@ public class ColumnIndex
 
         // notify observers about each new row
         if (!observers.isEmpty())
-            observers.forEach((o) -> o.nextUnfilteredCluster(unfiltered));
+            observers.forEach((o) -> o.nextUnfilteredCluster(unfiltered, origPos));
 
         lastClustering = unfiltered.clustering();
         previousRowStart = pos;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/KeyFetcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/KeyFetcher.java b/src/java/org/apache/cassandra/index/sasi/KeyFetcher.java
new file mode 100644
index 0000000..80ee167
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/KeyFetcher.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.index.sasi;
+
+import java.io.IOException;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.index.sasi.disk.*;
+import org.apache.cassandra.io.*;
+import org.apache.cassandra.io.sstable.format.*;
+
+
+public interface KeyFetcher
+{
+    public Clustering getClustering(long offset);
+    public DecoratedKey getPartitionKey(long offset);
+
+    public RowKey getRowKey(long partitionOffset, long rowOffset);
+
+    /**
+     * Fetches clustering and partition key from the sstable.
+     *
+     * Currently, clustering key is fetched from the data file of the sstable and partition key is
+     * read from the index file. Reading from index file helps us to warm up key cache in this case.
+     */
+    public static class SSTableKeyFetcher implements KeyFetcher
+    {
+        private final SSTableReader sstable;
+
+        public SSTableKeyFetcher(SSTableReader reader)
+        {
+            sstable = reader;
+        }
+
+        @Override
+        public Clustering getClustering(long offset)
+        {
+            try
+            {
+                return sstable.clusteringAt(offset);
+            }
+            catch (IOException e)
+            {
+                throw new FSReadError(new IOException("Failed to read clustering from " + sstable.descriptor, e), sstable.getFilename());
+            }
+        }
+
+        @Override
+        public DecoratedKey getPartitionKey(long offset)
+        {
+            try
+            {
+                return sstable.keyAt(offset);
+            }
+            catch (IOException e)
+            {
+                throw new FSReadError(new IOException("Failed to read key from " + sstable.descriptor, e), sstable.getFilename());
+            }
+        }
+
+        @Override
+        public RowKey getRowKey(long partitionOffset, long rowOffset)
+        {
+            if (rowOffset == KeyOffsets.NO_OFFSET)
+                return new RowKey(getPartitionKey(partitionOffset), null, sstable.metadata.comparator);
+            else
+                return new RowKey(getPartitionKey(partitionOffset), getClustering(rowOffset), sstable.metadata.comparator);
+        }
+
+        public int hashCode()
+        {
+            return sstable.descriptor.hashCode();
+        }
+
+        public boolean equals(Object other)
+        {
+            return other instanceof SSTableKeyFetcher
+                   && sstable.descriptor.equals(((SSTableKeyFetcher) other).sstable.descriptor);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/SASIIndex.java b/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
index 4375964..65953a9 100644
--- a/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
+++ b/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
@@ -46,6 +46,7 @@ import org.apache.cassandra.index.sasi.conf.ColumnIndex;
 import org.apache.cassandra.index.sasi.conf.IndexMode;
 import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder.Mode;
 import org.apache.cassandra.index.sasi.disk.PerSSTableIndexWriter;
+import org.apache.cassandra.index.sasi.disk.RowKey;
 import org.apache.cassandra.index.sasi.plan.QueryPlan;
 import org.apache.cassandra.index.transactions.IndexTransaction;
 import org.apache.cassandra.io.sstable.Descriptor;
@@ -182,6 +183,14 @@ public class SASIIndex implements Index, INotificationConsumer
         return getTruncateTask(FBUtilities.timestampMicros());
     }
 
+    public Callable<?> getTruncateTask(Collection<SSTableReader> sstablesToRebuild)
+    {
+        return () -> {
+            index.dropData(sstablesToRebuild);
+            return null;
+        };
+    }
+
     public Callable<?> getTruncateTask(long truncatedAt)
     {
         return () -> {
@@ -252,7 +261,7 @@ public class SASIIndex implements Index, INotificationConsumer
             public void insertRow(Row row)
             {
                 if (isNewData())
-                    adjustMemtableSize(index.index(key, row), opGroup);
+                    adjustMemtableSize(index.index(new RowKey(key, row.clustering(), baseCfs.getComparator()), row), opGroup);
             }
 
             public void updateRow(Row oldRow, Row newRow)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java b/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java
index d50875a..d6706ea 100644
--- a/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java
+++ b/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java
@@ -94,16 +94,23 @@ class SASIIndexBuilder extends SecondaryIndexBuilder
                         {
                             RowIndexEntry indexEntry = sstable.getPosition(key, SSTableReader.Operator.EQ);
                             dataFile.seek(indexEntry.position);
-                            ByteBufferUtil.readWithShortLength(dataFile); // key
+                            int staticOffset = ByteBufferUtil.readWithShortLength(dataFile).remaining(); // key
 
                             try (SSTableIdentityIterator partition = SSTableIdentityIterator.create(sstable, dataFile, key))
                             {
                                 // if the row has statics attached, it has to be indexed separately
                                 if (cfs.metadata.hasStaticColumns())
-                                    indexWriter.nextUnfilteredCluster(partition.staticRow());
+                                {
+                                    long staticPosition = indexEntry.position + staticOffset;
+                                    indexWriter.nextUnfilteredCluster(partition.staticRow(), staticPosition);
+                                }
 
+                                long position = dataFile.getPosition();
                                 while (partition.hasNext())
-                                    indexWriter.nextUnfilteredCluster(partition.next());
+                                {
+                                    indexWriter.nextUnfilteredCluster(partition.next(), position);
+                                    position = dataFile.getPosition();
+                                }
                             }
                         }
                         catch (IOException ex)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/SSTableIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/SSTableIndex.java b/src/java/org/apache/cassandra/index/sasi/SSTableIndex.java
index c67c39c..f9c8abf 100644
--- a/src/java/org/apache/cassandra/index/sasi/SSTableIndex.java
+++ b/src/java/org/apache/cassandra/index/sasi/SSTableIndex.java
@@ -18,28 +18,22 @@
 package org.apache.cassandra.index.sasi;
 
 import java.io.File;
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.index.sasi.conf.ColumnIndex;
-import org.apache.cassandra.index.sasi.disk.OnDiskIndex;
-import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder;
+import org.apache.cassandra.index.sasi.disk.*;
 import org.apache.cassandra.index.sasi.disk.Token;
 import org.apache.cassandra.index.sasi.plan.Expression;
 import org.apache.cassandra.index.sasi.utils.RangeIterator;
-import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.concurrent.Ref;
 
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 
-import com.google.common.base.Function;
-
 public class SSTableIndex
 {
     private final ColumnIndex columnIndex;
@@ -65,7 +59,7 @@ public class SSTableIndex
                 sstable.getFilename(),
                 columnIndex.getIndexName());
 
-        this.index = new OnDiskIndex(indexFile, validator, new DecoratedKeyFetcher(sstable));
+        this.index = new OnDiskIndex(indexFile, validator, new KeyFetcher.SSTableKeyFetcher(sstable));
     }
 
     public OnDiskIndexBuilder.Mode mode()
@@ -163,36 +157,5 @@ public class SSTableIndex
         return String.format("SSTableIndex(column: %s, SSTable: %s)", columnIndex.getColumnName(), sstable.descriptor);
     }
 
-    private static class DecoratedKeyFetcher implements Function<Long, DecoratedKey>
-    {
-        private final SSTableReader sstable;
-
-        DecoratedKeyFetcher(SSTableReader reader)
-        {
-            sstable = reader;
-        }
-
-        public DecoratedKey apply(Long offset)
-        {
-            try
-            {
-                return sstable.keyAt(offset);
-            }
-            catch (IOException e)
-            {
-                throw new FSReadError(new IOException("Failed to read key from " + sstable.descriptor, e), sstable.getFilename());
-            }
-        }
-
-        public int hashCode()
-        {
-            return sstable.descriptor.hashCode();
-        }
 
-        public boolean equals(Object other)
-        {
-            return other instanceof DecoratedKeyFetcher
-                    && sstable.descriptor.equals(((DecoratedKeyFetcher) other).sstable.descriptor);
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java b/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java
index 0958113..459e5c3 100644
--- a/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java
@@ -30,7 +30,6 @@ import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.Operator;
-import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Memtable;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.AsciiType;
@@ -40,6 +39,7 @@ import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.index.sasi.analyzer.AbstractAnalyzer;
 import org.apache.cassandra.index.sasi.conf.view.View;
 import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder;
+import org.apache.cassandra.index.sasi.disk.RowKey;
 import org.apache.cassandra.index.sasi.disk.Token;
 import org.apache.cassandra.index.sasi.memory.IndexMemtable;
 import org.apache.cassandra.index.sasi.plan.Expression;
@@ -99,7 +99,7 @@ public class ColumnIndex
         return keyValidator;
     }
 
-    public long index(DecoratedKey key, Row row)
+    public long index(RowKey key, Row row)
     {
         return getCurrentMemtable().index(key, getValueOf(column, row, FBUtilities.nowInSeconds()));
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/conf/view/RangeTermTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/conf/view/RangeTermTree.java b/src/java/org/apache/cassandra/index/sasi/conf/view/RangeTermTree.java
index d6b4551..2775c29 100644
--- a/src/java/org/apache/cassandra/index/sasi/conf/view/RangeTermTree.java
+++ b/src/java/org/apache/cassandra/index/sasi/conf/view/RangeTermTree.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.index.sasi.conf.view;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -46,6 +47,9 @@ public class RangeTermTree implements TermTree
 
     public Set<SSTableIndex> search(Expression e)
     {
+        if (e == null)
+            return Collections.emptySet();
+
         ByteBuffer minTerm = e.lower == null ? min : e.lower.value;
         ByteBuffer maxTerm = e.upper == null ? max : e.upper.value;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/disk/AbstractTokenTreeBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/AbstractTokenTreeBuilder.java b/src/java/org/apache/cassandra/index/sasi/disk/AbstractTokenTreeBuilder.java
index 18994de..9245960 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/AbstractTokenTreeBuilder.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/AbstractTokenTreeBuilder.java
@@ -20,19 +20,18 @@ package org.apache.cassandra.index.sasi.disk;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
+import java.util.*;
+import java.util.function.*;
 
+import com.carrotsearch.hppc.LongArrayList;
+import com.carrotsearch.hppc.cursors.LongCursor;
+import com.carrotsearch.hppc.cursors.LongObjectCursor;
+import org.apache.cassandra.dht.*;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.AbstractIterator;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 
-import com.carrotsearch.hppc.LongArrayList;
-import com.carrotsearch.hppc.LongSet;
-import com.carrotsearch.hppc.cursors.LongCursor;
-
 public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
 {
     protected int numBlocks;
@@ -65,7 +64,7 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
     public int serializedSize()
     {
         if (numBlocks == 1)
-            return (BLOCK_HEADER_BYTES + ((int) tokenCount * 16));
+            return BLOCK_HEADER_BYTES + ((int) tokenCount * LEAF_ENTRY_BYTES);
         else
             return numBlocks * BLOCK_BYTES;
     }
@@ -112,6 +111,15 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
         buffer.clear();
     }
 
+    /**
+     * Tree node,
+     *
+     * B+-tree consists of root, interior nodes and leaves. Root can be either a node or a leaf.
+     *
+     * Depending on the concrete implementation of {@code TokenTreeBuilder}
+     * leaf can be partial or static (in case of {@code StaticTokenTreeBuilder} or dynamic in case
+     * of {@code DynamicTokenTreeBuilder}
+     */
     protected abstract class Node
     {
         protected InteriorNode parent;
@@ -179,8 +187,16 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
             alignBuffer(buf, BLOCK_HEADER_BYTES);
         }
 
+        /**
+         * Shared header part, written for all node types:
+         *     [  info byte  ] [  token count   ] [ min node token ] [ max node token ]
+         *     [      1b     ] [    2b (short)  ] [   8b (long)    ] [    8b (long)   ]
+         **/
         private abstract class Header
         {
+            /**
+             * Serializes the shared part of the header
+             */
             public void serialize(ByteBuffer buf)
             {
                 buf.put(infoByte())
@@ -192,6 +208,12 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
             protected abstract byte infoByte();
         }
 
+        /**
+         * In addition to shared header part, root header stores version information,
+         * overall token count and min/max tokens for the whole tree:
+         *     [      magic    ] [  overall token count  ] [ min tree token ] [ max tree token ]
+         *     [   2b (short)  ] [         8b (long)     ] [   8b (long)    ] [    8b (long)   ]
+         */
         private class RootHeader extends Header
         {
             public void serialize(ByteBuffer buf)
@@ -207,19 +229,21 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
             {
                 // if leaf, set leaf indicator and last leaf indicator (bits 0 & 1)
                 // if not leaf, clear both bits
-                return (byte) ((isLeaf()) ? 3 : 0);
+                return isLeaf() ? ENTRY_TYPE_MASK : 0;
             }
 
             protected void writeMagic(ByteBuffer buf)
             {
                 switch (Descriptor.CURRENT_VERSION)
                 {
-                    case Descriptor.VERSION_AB:
+                    case ab:
                         buf.putShort(AB_MAGIC);
                         break;
-
-                    default:
+                    case ac:
+                        buf.putShort(AC_MAGIC);
                         break;
+                    default:
+                        throw new RuntimeException("Unsupported version");
                 }
 
             }
@@ -249,6 +273,12 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
 
     }
 
+    /**
+     * Leaf consists of
+     *   - header (format described in {@code Header} )
+     *   - data (format described in {@code LeafEntry})
+     *   - overflow collision entries, that hold {@value OVERFLOW_TRAILER_CAPACITY} of {@code RowOffset}.
+     */
     protected abstract class Leaf extends Node
     {
         protected LongArrayList overflowCollisions;
@@ -279,82 +309,98 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
 
         protected abstract void serializeData(ByteBuffer buf);
 
-        protected LeafEntry createEntry(final long tok, final LongSet offsets)
+        protected LeafEntry createEntry(final long tok, final KeyOffsets offsets)
         {
-            int offsetCount = offsets.size();
+            LongArrayList rawOffsets = new LongArrayList(offsets.size());
+
+            offsets.forEach(new Consumer<LongObjectCursor<long[]>>()
+            {
+                public void accept(LongObjectCursor<long[]> cursor)
+                {
+                    for (long l : cursor.value)
+                    {
+                        rawOffsets.add(cursor.key);
+                        rawOffsets.add(l);
+                    }
+                }
+            });
+
+            int offsetCount = rawOffsets.size();
             switch (offsetCount)
             {
                 case 0:
                     throw new AssertionError("no offsets for token " + tok);
-                case 1:
-                    long offset = offsets.toArray()[0];
-                    if (offset > MAX_OFFSET)
-                        throw new AssertionError("offset " + offset + " cannot be greater than " + MAX_OFFSET);
-                    else if (offset <= Integer.MAX_VALUE)
-                        return new SimpleLeafEntry(tok, offset);
-                    else
-                        return new FactoredOffsetLeafEntry(tok, offset);
                 case 2:
-                    long[] rawOffsets = offsets.toArray();
-                    if (rawOffsets[0] <= Integer.MAX_VALUE && rawOffsets[1] <= Integer.MAX_VALUE &&
-                        (rawOffsets[0] <= Short.MAX_VALUE || rawOffsets[1] <= Short.MAX_VALUE))
-                        return new PackedCollisionLeafEntry(tok, rawOffsets);
-                    else
-                        return createOverflowEntry(tok, offsetCount, offsets);
+                    return new SimpleLeafEntry(tok, rawOffsets.get(0), rawOffsets.get(1));
                 default:
-                    return createOverflowEntry(tok, offsetCount, offsets);
+                    assert offsetCount % 2 == 0;
+                    if (offsetCount == 4)
+                    {
+                        if (rawOffsets.get(0) < Integer.MAX_VALUE && rawOffsets.get(1) < Integer.MAX_VALUE &&
+                            rawOffsets.get(2) < Integer.MAX_VALUE && rawOffsets.get(3) < Integer.MAX_VALUE)
+                        {
+                            return new PackedCollisionLeafEntry(tok, (int)rawOffsets.get(0), (int) rawOffsets.get(1),
+                                                                (int) rawOffsets.get(2), (int) rawOffsets.get(3));
+                        }
+                    }
+                    return createOverflowEntry(tok, offsetCount, rawOffsets);
             }
         }
 
-        private LeafEntry createOverflowEntry(final long tok, final int offsetCount, final LongSet offsets)
+        private LeafEntry createOverflowEntry(final long tok, final int offsetCount, final LongArrayList offsets)
         {
             if (overflowCollisions == null)
-                overflowCollisions = new LongArrayList();
+                overflowCollisions = new LongArrayList(offsetCount);
 
-            LeafEntry entry = new OverflowCollisionLeafEntry(tok, (short) overflowCollisions.size(), (short) offsetCount);
-            for (LongCursor o : offsets)
-            {
-                if (overflowCollisions.size() == OVERFLOW_TRAILER_CAPACITY)
-                    throw new AssertionError("cannot have more than " + OVERFLOW_TRAILER_CAPACITY + " overflow collisions per leaf");
-                else
-                    overflowCollisions.add(o.value);
-            }
+            int overflowCount = (overflowCollisions.size() + offsetCount) / 2;
+            if (overflowCount >= OVERFLOW_TRAILER_CAPACITY)
+                throw new AssertionError("cannot have more than " + OVERFLOW_TRAILER_CAPACITY + " overflow collisions per leaf, but had: " + overflowCount);
+
+            LeafEntry entry = new OverflowCollisionLeafEntry(tok, (short) (overflowCollisions.size() / 2), (short) (offsetCount / 2));
+            overflowCollisions.addAll(offsets);
             return entry;
         }
 
+        /**
+         * A leaf of the B+-Tree, that holds information about the row offset(s) for
+         * the current token.
+         *
+         * Main 3 types of leaf entries are:
+         *   1) simple leaf entry: holding just a single row offset
+         *   2) packed collision leaf entry: holding two entries that would fit together into 168 bytes
+         *   3) overflow entry: only holds offset in overflow trailer and amount of entries belonging to this leaf
+         */
         protected abstract class LeafEntry
         {
             protected final long token;
 
             abstract public EntryType type();
-            abstract public int offsetData();
-            abstract public short offsetExtra();
 
             public LeafEntry(final long tok)
             {
                 token = tok;
             }
 
-            public void serialize(ByteBuffer buf)
-            {
-                buf.putShort((short) type().ordinal())
-                   .putShort(offsetExtra())
-                   .putLong(token)
-                   .putInt(offsetData());
-            }
+            public abstract void serialize(ByteBuffer buf);
 
         }
 
-
-        // assumes there is a single offset and the offset is <= Integer.MAX_VALUE
+        /**
+         * Simple leaf, that can store a single row offset, having the following format:
+         *
+         *     [    type    ] [   token   ] [ partition offset ] [ row offset ]
+         *     [ 2b (short) ] [ 8b (long) ] [     8b (long)    ] [  8b (long) ]
+         */
         protected class SimpleLeafEntry extends LeafEntry
         {
-            private final long offset;
+            private final long partitionOffset;
+            private final long rowOffset;
 
-            public SimpleLeafEntry(final long tok, final long off)
+            public SimpleLeafEntry(final long tok, final long partitionOffset, final long rowOffset)
             {
                 super(tok);
-                offset = off;
+                this.partitionOffset = partitionOffset;
+                this.rowOffset = rowOffset;
             }
 
             public EntryType type()
@@ -362,61 +408,38 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
                 return EntryType.SIMPLE;
             }
 
-            public int offsetData()
-            {
-                return (int) offset;
-            }
-
-            public short offsetExtra()
-            {
-                return 0;
-            }
-        }
-
-        // assumes there is a single offset and Integer.MAX_VALUE < offset <= MAX_OFFSET
-        // take the middle 32 bits of offset (or the top 32 when considering offset is max 48 bits)
-        // and store where offset is normally stored. take bottom 16 bits of offset and store in entry header
-        private class FactoredOffsetLeafEntry extends LeafEntry
-        {
-            private final long offset;
-
-            public FactoredOffsetLeafEntry(final long tok, final long off)
-            {
-                super(tok);
-                offset = off;
-            }
-
-            public EntryType type()
-            {
-                return EntryType.FACTORED;
-            }
-
-            public int offsetData()
-            {
-                return (int) (offset >>> Short.SIZE);
-            }
-
-            public short offsetExtra()
+            @Override
+            public void serialize(ByteBuffer buf)
             {
-                // exta offset is supposed to be an unsigned 16-bit integer
-                return (short) offset;
+                buf.putShort((short) type().ordinal())
+                   .putLong(token)
+                   .putLong(partitionOffset)
+                   .putLong(rowOffset);
             }
         }
 
-        // holds an entry with two offsets that can be packed in an int & a short
-        // the int offset is stored where offset is normally stored. short offset is
-        // stored in entry header
-        private class PackedCollisionLeafEntry extends LeafEntry
+        /**
+         * Packed collision entry, can store two offsets, if each one of their positions
+         * fit into 4 bytes.
+         *     [    type    ] [   token   ] [ partition offset 1 ] [ row offset  1] [ partition offset 1 ] [ row offset  1]
+         *     [ 2b (short) ] [ 8b (long) ] [      4b (int)      ] [    4b (int)  ] [      4b (int)      ] [    4b (int)  ]
+         */
+        protected class PackedCollisionLeafEntry extends LeafEntry
         {
-            private short smallerOffset;
-            private int largerOffset;
+            private final int partitionOffset1;
+            private final int rowOffset1;
+            private final int partitionOffset2;
+            private final int rowOffset2;
 
-            public PackedCollisionLeafEntry(final long tok, final long[] offs)
+            public PackedCollisionLeafEntry(final long tok, final int partitionOffset1, final int rowOffset1,
+                                            final int partitionOffset2, final int rowOffset2)
             {
                 super(tok);
+                this.partitionOffset1 = partitionOffset1;
+                this.rowOffset1 = rowOffset1;
+                this.partitionOffset2 = partitionOffset2;
+                this.rowOffset2 = rowOffset2;
 
-                smallerOffset = (short) Math.min(offs[0], offs[1]);
-                largerOffset = (int) Math.max(offs[0], offs[1]);
             }
 
             public EntryType type()
@@ -424,21 +447,27 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
                 return EntryType.PACKED;
             }
 
-            public int offsetData()
-            {
-                return largerOffset;
-            }
-
-            public short offsetExtra()
+            @Override
+            public void serialize(ByteBuffer buf)
             {
-                return smallerOffset;
-            }
-        }
-
-        // holds an entry with three or more offsets, or two offsets that cannot
-        // be packed into an int & a short. the index into the overflow list
-        // is stored where the offset is normally stored. the number of overflowed offsets
-        // for the entry is stored in the entry header
+                buf.putShort((short) type().ordinal())
+                   .putLong(token)
+                   .putInt(partitionOffset1)
+                   .putInt(rowOffset1)
+                   .putInt(partitionOffset2)
+                   .putInt(rowOffset2);
+            }
+        }
+
+        /**
+         * Overflow collision entry, holds an entry with three or more offsets, or two offsets
+         * that cannot be packed into 16 bytes.
+         *     [    type    ] [   token   ] [ start index ] [    count    ]
+         *     [ 2b (short) ] [ 8b (long) ] [   8b (long) ] [  8b (long)  ]
+         *
+         *   - [ start index ] is a position of first item belonging to this leaf entry in the overflow trailer
+         *   - [ count ] is the amount of items belonging to this leaf entry that are stored in the overflow trailer
+         */
         private class OverflowCollisionLeafEntry extends LeafEntry
         {
             private final short startIndex;
@@ -456,20 +485,23 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
                 return EntryType.OVERFLOW;
             }
 
-            public int offsetData()
-            {
-                return startIndex;
-            }
-
-            public short offsetExtra()
+            @Override
+            public void serialize(ByteBuffer buf)
             {
-                return count;
+                buf.putShort((short) type().ordinal())
+                   .putLong(token)
+                   .putLong(startIndex)
+                   .putLong(count);
             }
-
         }
-
     }
 
+    /**
+     * Interior node consists of:
+     *    - (interior node) header
+     *    - tokens (serialized as longs, with count stored in header)
+     *    - child offsets
+     */
     protected class InteriorNode extends Node
     {
         protected List<Long> tokens = new ArrayList<>(TOKENS_PER_BLOCK);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/disk/Descriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/Descriptor.java b/src/java/org/apache/cassandra/index/sasi/disk/Descriptor.java
index 3aa6f14..3fa0f06 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/Descriptor.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/Descriptor.java
@@ -22,30 +22,29 @@ package org.apache.cassandra.index.sasi.disk;
  */
 public class Descriptor
 {
-    public static final String VERSION_AA = "aa";
-    public static final String VERSION_AB = "ab";
-    public static final String CURRENT_VERSION = VERSION_AB;
-    public static final Descriptor CURRENT = new Descriptor(CURRENT_VERSION);
-
-    public static class Version
+    public static enum Version
     {
-        public final String version;
+        aa,
+        ab,
+        ac
+    }
 
-        public Version(String version)
-        {
-            this.version = version;
-        }
+    public static final Version VERSION_AA = Version.aa;
+    public static final Version VERSION_AB = Version.ab;
+    public static final Version VERSION_AC = Version.ac;
 
-        public String toString()
-        {
-            return version;
-        }
-    }
+    public static final Version CURRENT_VERSION = Version.ac;
+    public static final Descriptor CURRENT = new Descriptor(CURRENT_VERSION);
 
     public final Version version;
 
     public Descriptor(String v)
     {
-        this.version = new Version(v);
+        this.version = Version.valueOf(v);
+    }
+
+    public Descriptor(Version v)
+    {
+        this.version = v;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/disk/DynamicTokenTreeBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/DynamicTokenTreeBuilder.java b/src/java/org/apache/cassandra/index/sasi/disk/DynamicTokenTreeBuilder.java
index 2ddfd89..6e3e163 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/DynamicTokenTreeBuilder.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/DynamicTokenTreeBuilder.java
@@ -20,17 +20,14 @@ package org.apache.cassandra.index.sasi.disk;
 import java.nio.ByteBuffer;
 import java.util.*;
 
-import org.apache.cassandra.utils.AbstractIterator;
-import org.apache.cassandra.utils.Pair;
-
-import com.carrotsearch.hppc.LongOpenHashSet;
-import com.carrotsearch.hppc.LongSet;
-import com.carrotsearch.hppc.cursors.LongCursor;
+import com.carrotsearch.hppc.cursors.LongObjectCursor;
+import org.apache.cassandra.dht.*;
+import org.apache.cassandra.utils.*;
 
 public class DynamicTokenTreeBuilder extends AbstractTokenTreeBuilder
 {
-    private final SortedMap<Long, LongSet> tokens = new TreeMap<>();
 
+    private final SortedMap<Long, KeyOffsets> tokens = new TreeMap<>();
 
     public DynamicTokenTreeBuilder()
     {}
@@ -40,54 +37,52 @@ public class DynamicTokenTreeBuilder extends AbstractTokenTreeBuilder
         add(data);
     }
 
-    public DynamicTokenTreeBuilder(SortedMap<Long, LongSet> data)
+    public DynamicTokenTreeBuilder(SortedMap<Long, KeyOffsets> data)
     {
         add(data);
     }
 
-    public void add(Long token, long keyPosition)
+    public void add(Long token, long partitionOffset, long rowOffset)
     {
-        LongSet found = tokens.get(token);
+        KeyOffsets found = tokens.get(token);
         if (found == null)
-            tokens.put(token, (found = new LongOpenHashSet(2)));
+            tokens.put(token, (found = new KeyOffsets(2)));
 
-        found.add(keyPosition);
+        found.put(partitionOffset, rowOffset);
     }
 
-    public void add(Iterator<Pair<Long, LongSet>> data)
+    public void add(Iterator<Pair<Long, KeyOffsets>> data)
     {
         while (data.hasNext())
         {
-            Pair<Long, LongSet> entry = data.next();
-            for (LongCursor l : entry.right)
-                add(entry.left, l.value);
+            Pair<Long, KeyOffsets> entry = data.next();
+            for (LongObjectCursor<long[]> cursor : entry.right)
+                for (long l : cursor.value)
+                    add(entry.left, cursor.key, l);
         }
     }
 
-    public void add(SortedMap<Long, LongSet> data)
+    public void add(SortedMap<Long, KeyOffsets> data)
     {
-        for (Map.Entry<Long, LongSet> newEntry : data.entrySet())
+        for (Map.Entry<Long, KeyOffsets> newEntry : data.entrySet())
         {
-            LongSet found = tokens.get(newEntry.getKey());
-            if (found == null)
-                tokens.put(newEntry.getKey(), (found = new LongOpenHashSet(4)));
-
-            for (LongCursor offset : newEntry.getValue())
-                found.add(offset.value);
+            for (LongObjectCursor<long[]> cursor : newEntry.getValue())
+                for (long l : cursor.value)
+                    add(newEntry.getKey(), cursor.key, l);
         }
     }
 
-    public Iterator<Pair<Long, LongSet>> iterator()
+    public Iterator<Pair<Long, KeyOffsets>> iterator()
     {
-        final Iterator<Map.Entry<Long, LongSet>> iterator = tokens.entrySet().iterator();
-        return new AbstractIterator<Pair<Long, LongSet>>()
+        final Iterator<Map.Entry<Long, KeyOffsets>> iterator = tokens.entrySet().iterator();
+        return new AbstractIterator<Pair<Long, KeyOffsets>>()
         {
-            protected Pair<Long, LongSet> computeNext()
+            protected Pair<Long, KeyOffsets> computeNext()
             {
                 if (!iterator.hasNext())
                     return endOfData();
 
-                Map.Entry<Long, LongSet> entry = iterator.next();
+                Map.Entry<Long, KeyOffsets> entry = iterator.next();
                 return Pair.create(entry.getKey(), entry.getValue());
             }
         };
@@ -161,9 +156,9 @@ public class DynamicTokenTreeBuilder extends AbstractTokenTreeBuilder
 
     private class DynamicLeaf extends Leaf
     {
-        private final SortedMap<Long, LongSet> tokens;
+        private final SortedMap<Long, KeyOffsets> tokens;
 
-        DynamicLeaf(SortedMap<Long, LongSet> data)
+        DynamicLeaf(SortedMap<Long, KeyOffsets> data)
         {
             super(data.firstKey(), data.lastKey());
             tokens = data;
@@ -181,7 +176,7 @@ public class DynamicTokenTreeBuilder extends AbstractTokenTreeBuilder
 
         protected void serializeData(ByteBuffer buf)
         {
-            for (Map.Entry<Long, LongSet> entry : tokens.entrySet())
+            for (Map.Entry<Long, KeyOffsets> entry : tokens.entrySet())
                 createEntry(entry.getKey(), entry.getValue()).serialize(buf);
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/disk/KeyOffsets.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/KeyOffsets.java b/src/java/org/apache/cassandra/index/sasi/disk/KeyOffsets.java
new file mode 100644
index 0000000..db849fe
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/disk/KeyOffsets.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.index.sasi.disk;
+
+import java.util.*;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import com.carrotsearch.hppc.LongObjectOpenHashMap;
+import com.carrotsearch.hppc.cursors.LongObjectCursor;
+
+public class KeyOffsets extends LongObjectOpenHashMap<long[]>
+{
+    public static final long NO_OFFSET = Long.MIN_VALUE;
+
+    public KeyOffsets() {
+        super(4);
+    }
+
+    public KeyOffsets(int initialCapacity) {
+        super(initialCapacity);
+    }
+
+    public void put(long currentPartitionOffset, long currentRowOffset)
+    {
+        if (containsKey(currentPartitionOffset))
+            super.put(currentPartitionOffset, append(get(currentPartitionOffset), currentRowOffset));
+        else
+            super.put(currentPartitionOffset, asArray(currentRowOffset));
+    }
+
+    public long[] put(long currentPartitionOffset, long[] currentRowOffset)
+    {
+        if (containsKey(currentPartitionOffset))
+            return super.put(currentPartitionOffset, merge(get(currentPartitionOffset), currentRowOffset));
+        else
+            return super.put(currentPartitionOffset, currentRowOffset);
+    }
+
+    public boolean equals(Object obj)
+    {
+        if (!(obj instanceof KeyOffsets))
+            return false;
+
+        KeyOffsets other = (KeyOffsets) obj;
+        if (other.size() != this.size())
+            return false;
+
+        for (LongObjectCursor<long[]> cursor : this)
+            if (!Arrays.equals(cursor.value, other.get(cursor.key)))
+                return false;
+
+        return true;
+    }
+
+    @Override
+    public String toString()
+    {
+        StringBuilder sb = new StringBuilder("KeyOffsets { ");
+        forEach((a, b) -> {
+            sb.append(a).append(": ").append(Arrays.toString(b));
+        });
+        sb.append(" }");
+        return sb.toString();
+    }
+
+    // primitive array creation
+    public static long[] asArray(long... vals)
+    {
+        return vals;
+    }
+
+    private static long[] merge(long[] arr1, long[] arr2)
+    {
+        long[] copy = new long[arr2.length];
+        int written = 0;
+        for (long l : arr2)
+        {
+            if (!ArrayUtils.contains(arr1, l))
+                copy[written++] = l;
+        }
+
+        if (written == 0)
+            return arr1;
+
+        long[] merged = new long[arr1.length + written];
+        System.arraycopy(arr1, 0, merged, 0, arr1.length);
+        System.arraycopy(copy, 0, merged, arr1.length, written);
+        return merged;
+    }
+
+    private static long[] append(long[] arr1, long v)
+    {
+        if (ArrayUtils.contains(arr1, v))
+            return arr1;
+        else
+            return ArrayUtils.add(arr1, v);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndex.java b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndex.java
index 4d43cd9..70d24a7 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndex.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndex.java
@@ -22,8 +22,7 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.stream.Collectors;
 
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.index.sasi.Term;
+import org.apache.cassandra.index.sasi.*;
 import org.apache.cassandra.index.sasi.plan.Expression;
 import org.apache.cassandra.index.sasi.plan.Expression.Op;
 import org.apache.cassandra.index.sasi.utils.MappedBuffer;
@@ -37,12 +36,12 @@ import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 
-import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.PeekingIterator;
 
 import static org.apache.cassandra.index.sasi.disk.OnDiskBlock.SearchResult;
+import static org.apache.cassandra.index.sasi.disk.TokenTreeBuilder.TOKEN_BYTES;
 
 public class OnDiskIndex implements Iterable<OnDiskIndex.DataTerm>, Closeable
 {
@@ -106,7 +105,7 @@ public class OnDiskIndex implements Iterable<OnDiskIndex.DataTerm>, Closeable
     protected final long indexSize;
     protected final boolean hasMarkedPartials;
 
-    protected final Function<Long, DecoratedKey> keyFetcher;
+    protected final KeyFetcher keyFetcher;
 
     protected final String indexPath;
 
@@ -116,7 +115,7 @@ public class OnDiskIndex implements Iterable<OnDiskIndex.DataTerm>, Closeable
     protected final ByteBuffer minTerm, maxTerm, minKey, maxKey;
 
     @SuppressWarnings("resource")
-    public OnDiskIndex(File index, AbstractType<?> cmp, Function<Long, DecoratedKey> keyReader)
+    public OnDiskIndex(File index, AbstractType<?> cmp, KeyFetcher keyReader)
     {
         keyFetcher = keyReader;
 
@@ -635,6 +634,7 @@ public class OnDiskIndex implements Iterable<OnDiskIndex.DataTerm>, Closeable
         {
             final long blockEnd = FBUtilities.align(content.position(), OnDiskIndexBuilder.BLOCK_SIZE);
 
+            // ([int] -1 for sparse, offset for non-sparse)
             if (isSparse())
                 return new PrefetchedTokensIterator(getSparseTokens());
 
@@ -658,7 +658,7 @@ public class OnDiskIndex implements Iterable<OnDiskIndex.DataTerm>, Closeable
             NavigableMap<Long, Token> individualTokens = new TreeMap<>();
             for (int i = 0; i < size; i++)
             {
-                Token token = perBlockIndex.get(content.getLong(ptrOffset + 1 + (8 * i)), keyFetcher);
+                Token token = perBlockIndex.get(content.getLong(ptrOffset + 1 + TOKEN_BYTES * i), keyFetcher);
 
                 assert token != null;
                 individualTokens.put(token.get(), token);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java
index 4946f06..b6e2da5 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
 import java.util.*;
 
 import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.dht.*;
 import org.apache.cassandra.index.sasi.plan.Expression.Op;
 import org.apache.cassandra.index.sasi.sa.IndexedTerm;
 import org.apache.cassandra.index.sasi.sa.IntegralSA;
@@ -37,7 +38,6 @@ import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 
 import com.carrotsearch.hppc.LongArrayList;
-import com.carrotsearch.hppc.LongSet;
 import com.carrotsearch.hppc.ShortArrayList;
 import com.google.common.annotations.VisibleForTesting;
 
@@ -163,7 +163,7 @@ public class OnDiskIndexBuilder
         this.marksPartials = marksPartials;
     }
 
-    public OnDiskIndexBuilder add(ByteBuffer term, DecoratedKey key, long keyPosition)
+    public OnDiskIndexBuilder add(ByteBuffer term, DecoratedKey key, long partitionOffset, long rowOffset)
     {
         if (term.remaining() >= MAX_TERM_SIZE)
         {
@@ -183,16 +183,16 @@ public class OnDiskIndexBuilder
             estimatedBytes += 64 + 48 + term.remaining();
         }
 
-        tokens.add((Long) key.getToken().getTokenValue(), keyPosition);
+        tokens.add((Long) key.getToken().getTokenValue(), partitionOffset, rowOffset);
 
         // calculate key range (based on actual key values) for current index
         minKey = (minKey == null || keyComparator.compare(minKey, key.getKey()) > 0) ? key.getKey() : minKey;
         maxKey = (maxKey == null || keyComparator.compare(maxKey, key.getKey()) < 0) ? key.getKey() : maxKey;
 
-        // 60 ((boolean(1)*4) + (long(8)*4) + 24) bytes for the LongOpenHashSet created when the keyPosition was added
-        // + 40 bytes for the TreeMap.Entry + 8 bytes for the token (key).
+        // 84 ((boolean(1)*4) + (long(8)*4) + 24 + 24) bytes for the LongObjectOpenHashMap<long[]> created
+        // when the keyPosition was added + 40 bytes for the TreeMap.Entry + 8 bytes for the token (key).
         // in the case of hash collision for the token we may overestimate but this is extremely rare
-        estimatedBytes += 60 + 40 + 8;
+        estimatedBytes += 84 + 40 + 8;
 
         return this;
     }
@@ -569,7 +569,7 @@ public class OnDiskIndexBuilder
         }
     }
 
-    private static class MutableDataBlock extends MutableBlock<InMemoryDataTerm>
+    private class MutableDataBlock extends MutableBlock<InMemoryDataTerm>
     {
         private static final int MAX_KEYS_SPARSE = 5;
 
@@ -651,7 +651,7 @@ public class OnDiskIndexBuilder
         {
             term.serialize(buffer);
             buffer.writeByte((byte) keys.getTokenCount());
-            for (Pair<Long, LongSet> key : keys)
+            for (Pair<Long, KeyOffsets> key : keys)
                 buffer.writeLong(key.left);
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java b/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java
index 9fa4e87..c204883 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java
@@ -109,7 +109,7 @@ public class PerSSTableIndexWriter implements SSTableFlushObserver
         currentKeyPosition = curPosition;
     }
 
-    public void nextUnfilteredCluster(Unfiltered unfiltered)
+    public void nextUnfilteredCluster(Unfiltered unfiltered, long currentRowOffset)
     {
         if (!unfiltered.isRow())
             return;
@@ -129,10 +129,15 @@ public class PerSSTableIndexWriter implements SSTableFlushObserver
             if (index == null)
                 indexes.put(column, (index = newIndex(columnIndex)));
 
-            index.add(value.duplicate(), currentKey, currentKeyPosition);
+            index.add(value.duplicate(), currentKey, currentKeyPosition, currentRowOffset);
         });
     }
 
+    public void nextUnfilteredCluster(Unfiltered unfilteredCluster)
+    {
+        throw new UnsupportedOperationException("SASI Index does not support direct row access.");
+    }
+
     public void complete()
     {
         if (isComplete)
@@ -197,7 +202,7 @@ public class PerSSTableIndexWriter implements SSTableFlushObserver
             this.currentBuilder = newIndexBuilder();
         }
 
-        public void add(ByteBuffer term, DecoratedKey key, long keyPosition)
+        public void add(ByteBuffer term, DecoratedKey key, long partitoinOffset, long rowOffset)
         {
             if (term.remaining() == 0)
                 return;
@@ -235,7 +240,7 @@ public class PerSSTableIndexWriter implements SSTableFlushObserver
                     }
                 }
 
-                currentBuilder.add(token, key, keyPosition);
+                currentBuilder.add(token, key, partitoinOffset, rowOffset);
                 isAdded = true;
             }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/disk/RowKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/RowKey.java b/src/java/org/apache/cassandra/index/sasi/disk/RowKey.java
new file mode 100644
index 0000000..518ad27
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/disk/RowKey.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyclustering ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.index.sasi.disk;
+
+import java.util.*;
+import java.util.stream.*;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.utils.*;
+
+/**
+ * Primary key of the found row, a combination of the Partition Key
+ * and clustering that belongs to the row.
+ */
+public class RowKey implements Comparable<RowKey>
+{
+
+    public final DecoratedKey decoratedKey;
+    public final Clustering clustering;
+
+    private final ClusteringComparator comparator;
+
+    public RowKey(DecoratedKey primaryKey, Clustering clustering, ClusteringComparator comparator)
+    {
+        this.decoratedKey = primaryKey;
+        this.clustering = clustering;
+        this.comparator = comparator;
+    }
+
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        RowKey rowKey = (RowKey) o;
+
+        if (decoratedKey != null ? !decoratedKey.equals(rowKey.decoratedKey) : rowKey.decoratedKey != null)
+            return false;
+        return clustering != null ? clustering.equals(rowKey.clustering) : rowKey.clustering == null;
+    }
+
+    public int hashCode()
+    {
+        return new HashCodeBuilder().append(decoratedKey).append(clustering).hashCode();
+    }
+
+    public int compareTo(RowKey other)
+    {
+        int cmp = this.decoratedKey.compareTo(other.decoratedKey);
+        if (cmp == 0 && clustering != null)
+        {
+            // Both clustering and rows should match
+            if (clustering.kind() == ClusteringPrefix.Kind.STATIC_CLUSTERING || other.clustering.kind() == ClusteringPrefix.Kind.STATIC_CLUSTERING)
+                return 0;
+
+            return comparator.compare(this.clustering, other.clustering);
+        }
+        else
+        {
+            return cmp;
+        }
+    }
+
+    public static RowKeyComparator COMPARATOR = new RowKeyComparator();
+
+    public String toString(CFMetaData metadata)
+    {
+        return String.format("RowKey: { pk : %s, clustering: %s}",
+                             metadata.getKeyValidator().getString(decoratedKey.getKey()),
+                             clustering.toString(metadata));
+    }
+
+    @Override
+    public String toString()
+    {
+        return String.format("RowKey: { pk : %s, clustering: %s}",
+                             ByteBufferUtil.bytesToHex(decoratedKey.getKey()),
+                             String.join(",", Arrays.stream(clustering.getRawValues()).map(ByteBufferUtil::bytesToHex).collect(Collectors.toList())));
+    }
+
+    private static class RowKeyComparator implements Comparator<RowKey>
+    {
+        public int compare(RowKey o1, RowKey o2)
+        {
+            return o1.compareTo(o2);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/disk/StaticTokenTreeBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/StaticTokenTreeBuilder.java b/src/java/org/apache/cassandra/index/sasi/disk/StaticTokenTreeBuilder.java
index 6e64c56..8a11d60 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/StaticTokenTreeBuilder.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/StaticTokenTreeBuilder.java
@@ -19,8 +19,7 @@ package org.apache.cassandra.index.sasi.disk;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Iterator;
-import java.util.SortedMap;
+import java.util.*;
 
 import org.apache.cassandra.index.sasi.utils.CombinedTerm;
 import org.apache.cassandra.index.sasi.utils.RangeIterator;
@@ -28,7 +27,6 @@ import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.AbstractIterator;
 import org.apache.cassandra.utils.Pair;
 
-import com.carrotsearch.hppc.LongSet;
 import com.google.common.collect.Iterators;
 
 /**
@@ -63,17 +61,17 @@ public class StaticTokenTreeBuilder extends AbstractTokenTreeBuilder
         combinedTerm = term;
     }
 
-    public void add(Long token, long keyPosition)
+    public void add(Long token, long partitionOffset, long rowOffset)
     {
         throw new UnsupportedOperationException();
     }
 
-    public void add(SortedMap<Long, LongSet> data)
+    public void add(SortedMap<Long, KeyOffsets> data)
     {
         throw new UnsupportedOperationException();
     }
 
-    public void add(Iterator<Pair<Long, LongSet>> data)
+    public void add(Iterator<Pair<Long, KeyOffsets>> data)
     {
         throw new UnsupportedOperationException();
     }
@@ -83,12 +81,12 @@ public class StaticTokenTreeBuilder extends AbstractTokenTreeBuilder
         return tokenCount == 0;
     }
 
-    public Iterator<Pair<Long, LongSet>> iterator()
+    public Iterator<Pair<Long, KeyOffsets>> iterator()
     {
-        Iterator<Token> iterator = combinedTerm.getTokenIterator();
-        return new AbstractIterator<Pair<Long, LongSet>>()
+        @SuppressWarnings("resource") Iterator<Token> iterator = combinedTerm.getTokenIterator();
+        return new AbstractIterator<Pair<Long, KeyOffsets>>()
         {
-            protected Pair<Long, LongSet> computeNext()
+            protected Pair<Long, KeyOffsets> computeNext()
             {
                 if (!iterator.hasNext())
                     return endOfData();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/disk/Token.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/Token.java b/src/java/org/apache/cassandra/index/sasi/disk/Token.java
index 4cd1ea3..2412477 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/Token.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/Token.java
@@ -18,13 +18,11 @@
 package org.apache.cassandra.index.sasi.disk;
 
 import com.google.common.primitives.Longs;
+import org.apache.commons.lang.builder.HashCodeBuilder;
 
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.index.sasi.utils.CombinedValue;
+import org.apache.cassandra.index.sasi.utils.*;
 
-import com.carrotsearch.hppc.LongSet;
-
-public abstract class Token implements CombinedValue<Long>, Iterable<DecoratedKey>
+public abstract class Token implements CombinedValue<Long>, Iterable<RowKey>
 {
     protected final long token;
 
@@ -38,7 +36,7 @@ public abstract class Token implements CombinedValue<Long>, Iterable<DecoratedKe
         return token;
     }
 
-    public abstract LongSet getOffsets();
+    public abstract KeyOffsets getOffsets();
 
     public int compareTo(CombinedValue<Long> o)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java b/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java
index c69ce00..1969627 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java
@@ -19,22 +19,21 @@ package org.apache.cassandra.index.sasi.disk;
 
 import java.io.IOException;
 import java.util.*;
+import java.util.stream.*;
 
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.index.sasi.utils.AbstractIterator;
-import org.apache.cassandra.index.sasi.utils.CombinedValue;
-import org.apache.cassandra.index.sasi.utils.MappedBuffer;
-import org.apache.cassandra.index.sasi.utils.RangeIterator;
-import org.apache.cassandra.utils.MergeIterator;
-
-import com.carrotsearch.hppc.LongOpenHashSet;
-import com.carrotsearch.hppc.LongSet;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
 import com.google.common.collect.Iterators;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 
-import static org.apache.cassandra.index.sasi.disk.TokenTreeBuilder.EntryType;
+import com.carrotsearch.hppc.cursors.LongObjectCursor;
+import org.apache.cassandra.index.sasi.*;
+import org.apache.cassandra.index.sasi.disk.Descriptor.*;
+import org.apache.cassandra.index.sasi.utils.AbstractIterator;
+import org.apache.cassandra.index.sasi.utils.*;
+import org.apache.cassandra.utils.*;
+
+import static org.apache.cassandra.index.sasi.disk.Descriptor.Version.*;
+import static org.apache.cassandra.index.sasi.disk.TokenTreeBuilder.*;
 
 // Note: all of the seek-able offsets contained in TokenTree should be sizeof(long)
 // even if currently only lower int portion of them if used, because that makes
@@ -42,9 +41,6 @@ import static org.apache.cassandra.index.sasi.disk.TokenTreeBuilder.EntryType;
 // without any on-disk format changes and/or re-indexing if one day we'll have a need to.
 public class TokenTree
 {
-    private static final int LONG_BYTES = Long.SIZE / 8;
-    private static final int SHORT_BYTES = Short.SIZE / 8;
-
     private final Descriptor descriptor;
     private final MappedBuffer file;
     private final long startPos;
@@ -66,8 +62,7 @@ public class TokenTree
 
         file.position(startPos + TokenTreeBuilder.SHARED_HEADER_BYTES);
 
-        if (!validateMagic())
-            throw new IllegalArgumentException("invalid token tree");
+        validateMagic();
 
         tokenCount = file.getLong();
         treeMinToken = file.getLong();
@@ -79,12 +74,12 @@ public class TokenTree
         return tokenCount;
     }
 
-    public RangeIterator<Long, Token> iterator(Function<Long, DecoratedKey> keyFetcher)
+    public RangeIterator<Long, Token> iterator(KeyFetcher keyFetcher)
     {
         return new TokenTreeIterator(file.duplicate(), keyFetcher);
     }
 
-    public OnDiskToken get(final long searchToken, Function<Long, DecoratedKey> keyFetcher)
+    public OnDiskToken get(final long searchToken, KeyFetcher keyFetcher)
     {
         seekToLeaf(searchToken, file);
         long leafStart = file.position();
@@ -95,21 +90,24 @@ public class TokenTree
 
         file.position(leafStart + TokenTreeBuilder.BLOCK_HEADER_BYTES);
 
-        OnDiskToken token = OnDiskToken.getTokenAt(file, tokenIndex, leafSize, keyFetcher);
+        OnDiskToken token = getTokenAt(file, tokenIndex, leafSize, keyFetcher);
+
         return token.get().equals(searchToken) ? token : null;
     }
 
-    private boolean validateMagic()
+    private void validateMagic()
     {
-        switch (descriptor.version.toString())
-        {
-            case Descriptor.VERSION_AA:
-                return true;
-            case Descriptor.VERSION_AB:
-                return TokenTreeBuilder.AB_MAGIC == file.getShort();
-            default:
-                return false;
-        }
+        if (descriptor.version == aa)
+            return;
+
+        short magic = file.getShort();
+        if (descriptor.version == Version.ab && magic == TokenTreeBuilder.AB_MAGIC)
+            return;
+
+        if (descriptor.version == Version.ac && magic == TokenTreeBuilder.AC_MAGIC)
+            return;
+
+        throw new IllegalArgumentException("invalid token tree. Written magic: '" + ByteBufferUtil.bytesToHex(ByteBufferUtil.bytes(magic)) + "'");
     }
 
     // finds leaf that *could* contain token
@@ -136,18 +134,16 @@ public class TokenTree
             long minToken = file.getLong();
             long maxToken = file.getLong();
 
-            long seekBase = blockStart + TokenTreeBuilder.BLOCK_HEADER_BYTES;
+            long seekBase = blockStart + BLOCK_HEADER_BYTES;
             if (minToken > token)
             {
                 // seek to beginning of child offsets to locate first child
-                file.position(seekBase + tokenCount * LONG_BYTES);
-                blockStart = (startPos + (int) file.getLong());
+                file.position(seekBase + tokenCount * TOKEN_BYTES);
             }
             else if (maxToken < token)
             {
                 // seek to end of child offsets to locate last child
-                file.position(seekBase + (2 * tokenCount) * LONG_BYTES);
-                blockStart = (startPos + (int) file.getLong());
+                file.position(seekBase + (2 * tokenCount) * TOKEN_BYTES);
             }
             else
             {
@@ -158,12 +154,11 @@ public class TokenTree
 
                 // file pointer is now at beginning of offsets
                 if (offsetIndex == tokenCount)
-                    file.position(file.position() + (offsetIndex * LONG_BYTES));
+                    file.position(file.position() + (offsetIndex * TOKEN_BYTES));
                 else
-                    file.position(file.position() + ((tokenCount - offsetIndex - 1) + offsetIndex) * LONG_BYTES);
-
-                blockStart = (startPos + (int) file.getLong());
+                    file.position(file.position() + ((tokenCount - offsetIndex - 1) + offsetIndex) * TOKEN_BYTES);
             }
+            blockStart = (startPos + (int) file.getLong());
         }
     }
 
@@ -172,8 +167,7 @@ public class TokenTree
         short offsetIndex = 0;
         for (int i = 0; i < tokenCount; i++)
         {
-            long readToken = file.getLong();
-            if (searchToken < readToken)
+            if (searchToken < file.getLong())
                 break;
 
             offsetIndex++;
@@ -193,10 +187,7 @@ public class TokenTree
         while (start <= end)
         {
             middle = start + ((end - start) >> 1);
-
-            // each entry is 16 bytes wide, token is in bytes 4-11
-            long token = file.getLong(base + (middle * (2 * LONG_BYTES) + 4));
-
+            long token = file.getLong(base + middle * LEAF_ENTRY_BYTES + (descriptor.version.compareTo(Version.ac) < 0 ? LEGACY_TOKEN_OFFSET_BYTES : TOKEN_OFFSET_BYTES));
             if (token == searchToken)
                 break;
 
@@ -209,9 +200,9 @@ public class TokenTree
         return (short) middle;
     }
 
-    public class TokenTreeIterator extends RangeIterator<Long, Token>
+    private class TokenTreeIterator extends RangeIterator<Long, Token>
     {
-        private final Function<Long, DecoratedKey> keyFetcher;
+        private final KeyFetcher keyFetcher;
         private final MappedBuffer file;
 
         private long currentLeafStart;
@@ -224,7 +215,7 @@ public class TokenTree
         protected boolean firstIteration = true;
         private boolean lastLeaf;
 
-        TokenTreeIterator(MappedBuffer file, Function<Long, DecoratedKey> keyFetcher)
+        TokenTreeIterator(MappedBuffer file, KeyFetcher keyFetcher)
         {
             super(treeMinToken, treeMaxToken, tokenCount);
 
@@ -314,13 +305,13 @@ public class TokenTree
 
         private Token getTokenAt(int idx)
         {
-            return OnDiskToken.getTokenAt(file, idx, leafSize, keyFetcher);
+            return TokenTree.this.getTokenAt(file, idx, leafSize, keyFetcher);
         }
 
         private long getTokenPosition(int idx)
         {
-            // skip 4 byte entry header to get position pointing directly at the entry's token
-            return OnDiskToken.getEntryPosition(idx, file) + (2 * SHORT_BYTES);
+            // skip entry header to get position pointing directly at the entry's token
+            return TokenTree.this.getEntryPosition(idx, file, descriptor) + (descriptor.version.compareTo(Version.ac) < 0 ? LEGACY_TOKEN_OFFSET_BYTES : TOKEN_OFFSET_BYTES);
         }
 
         private void seekToNextLeaf()
@@ -347,15 +338,15 @@ public class TokenTree
         }
     }
 
-    public static class OnDiskToken extends Token
+    public class OnDiskToken extends Token
     {
         private final Set<TokenInfo> info = new HashSet<>(2);
-        private final Set<DecoratedKey> loadedKeys = new TreeSet<>(DecoratedKey.comparator);
+        private final Set<RowKey> loadedKeys = new TreeSet<>(RowKey.COMPARATOR);
 
-        public OnDiskToken(MappedBuffer buffer, long position, short leafSize, Function<Long, DecoratedKey> keyFetcher)
+        private OnDiskToken(MappedBuffer buffer, long position, short leafSize, KeyFetcher keyFetcher)
         {
-            super(buffer.getLong(position + (2 * SHORT_BYTES)));
-            info.add(new TokenInfo(buffer, position, leafSize, keyFetcher));
+            super(buffer.getLong(position + (descriptor.version.compareTo(Version.ac) < 0 ? LEGACY_TOKEN_OFFSET_BYTES : TOKEN_OFFSET_BYTES)));
+            info.add(new TokenInfo(buffer, position, leafSize, keyFetcher, descriptor));
         }
 
         public void merge(CombinedValue<Long> other)
@@ -377,9 +368,9 @@ public class TokenTree
             }
         }
 
-        public Iterator<DecoratedKey> iterator()
+        public Iterator<RowKey> iterator()
         {
-            List<Iterator<DecoratedKey>> keys = new ArrayList<>(info.size());
+            List<Iterator<RowKey>> keys = new ArrayList<>(info.size());
 
             for (TokenInfo i : info)
                 keys.add(i.iterator());
@@ -387,68 +378,72 @@ public class TokenTree
             if (!loadedKeys.isEmpty())
                 keys.add(loadedKeys.iterator());
 
-            return MergeIterator.get(keys, DecoratedKey.comparator, new MergeIterator.Reducer<DecoratedKey, DecoratedKey>()
+            return MergeIterator.get(keys, RowKey.COMPARATOR, new MergeIterator.Reducer<RowKey, RowKey>()
             {
-                DecoratedKey reduced = null;
+                RowKey reduced = null;
 
                 public boolean trivialReduceIsTrivial()
                 {
                     return true;
                 }
 
-                public void reduce(int idx, DecoratedKey current)
+                public void reduce(int idx, RowKey current)
                 {
                     reduced = current;
                 }
 
-                protected DecoratedKey getReduced()
+                protected RowKey getReduced()
                 {
                     return reduced;
                 }
             });
         }
 
-        public LongSet getOffsets()
+        public KeyOffsets getOffsets()
         {
-            LongSet offsets = new LongOpenHashSet(4);
+            KeyOffsets offsets = new KeyOffsets();
             for (TokenInfo i : info)
             {
-                for (long offset : i.fetchOffsets())
-                    offsets.add(offset);
+                for (LongObjectCursor<long[]> offset : i.fetchOffsets())
+                    offsets.put(offset.key, offset.value);
             }
 
             return offsets;
         }
+    }
 
-        public static OnDiskToken getTokenAt(MappedBuffer buffer, int idx, short leafSize, Function<Long, DecoratedKey> keyFetcher)
-        {
-            return new OnDiskToken(buffer, getEntryPosition(idx, buffer), leafSize, keyFetcher);
-        }
+    private OnDiskToken getTokenAt(MappedBuffer buffer, int idx, short leafSize, KeyFetcher keyFetcher)
+    {
+        return new OnDiskToken(buffer, getEntryPosition(idx, buffer, descriptor), leafSize, keyFetcher);
+    }
 
-        private static long getEntryPosition(int idx, MappedBuffer file)
-        {
-            // info (4 bytes) + token (8 bytes) + offset (4 bytes) = 16 bytes
-            return file.position() + (idx * (2 * LONG_BYTES));
-        }
+    private long getEntryPosition(int idx, MappedBuffer file, Descriptor descriptor)
+    {
+        if (descriptor.version.compareTo(Version.ac) < 0)
+            return file.position() + (idx * LEGACY_LEAF_ENTRY_BYTES);
+
+        // skip n entries, to the entry with the given index
+        return file.position() + (idx * LEAF_ENTRY_BYTES);
     }
 
     private static class TokenInfo
     {
         private final MappedBuffer buffer;
-        private final Function<Long, DecoratedKey> keyFetcher;
-
+        private final KeyFetcher keyFetcher;
+        private final Descriptor descriptor;
         private final long position;
         private final short leafSize;
 
-        public TokenInfo(MappedBuffer buffer, long position, short leafSize, Function<Long, DecoratedKey> keyFetcher)
+        public TokenInfo(MappedBuffer buffer, long position, short leafSize, KeyFetcher keyFetcher, Descriptor descriptor)
         {
             this.keyFetcher = keyFetcher;
             this.buffer = buffer;
             this.position = position;
             this.leafSize = leafSize;
+            this.descriptor = descriptor;
         }
 
-        public Iterator<DecoratedKey> iterator()
+        public Iterator<RowKey> iterator()
         {
             return new KeyIterator(keyFetcher, fetchOffsets());
         }
@@ -465,59 +460,154 @@ public class TokenTree
 
             TokenInfo o = (TokenInfo) other;
             return keyFetcher == o.keyFetcher && position == o.position;
+
         }
 
-        private long[] fetchOffsets()
+        /**
+         * Legacy leaf storage format (used for reading data formats before AC):
+         *
+         *    [(short) leaf type][(short) offset extra bytes][(long) token][(int) offsetData]
+         *
+         * Many pairs can be encoded into long+int.
+         *
+         * Simple entry: offset fits into (int)
+         *
+         *    [(short) leaf type][(short) offset extra bytes][(long) token][(int) offsetData]
+         *
+         * FactoredOffset: a single offset, offset fits into (long)+(int) bits:
+         *
+         *    [(short) leaf type][(short) 16 bytes of remained offset][(long) token][(int) top 32 bits of offset]
+         *
+         * PackedCollisionEntry: packs the two offset entries into int and a short (if both of them fit into
+         * (long) and one of them fits into (int))
+         *
+         *    [(short) leaf type][(short) 16 the offset that'd fit into short][(long) token][(int) 32 bits of offset that'd fit into int]
+         *
+         * Otherwise, the rest gets packed into limited-size overflow collision entry
+         *
+         *    [(short) leaf type][(short) count][(long) token][(int) start index]
+         */
+        private KeyOffsets fetchOffsetsLegacy()
         {
             short info = buffer.getShort(position);
             // offset extra is unsigned short (right-most 16 bits of 48 bits allowed for an offset)
-            int offsetExtra = buffer.getShort(position + SHORT_BYTES) & 0xFFFF;
+            int offsetExtra = buffer.getShort(position + Short.BYTES) & 0xFFFF;
             // is the it left-most (32-bit) base of the actual offset in the index file
-            int offsetData = buffer.getInt(position + (2 * SHORT_BYTES) + LONG_BYTES);
+            int offsetData = buffer.getInt(position + (2 * Short.BYTES) + Long.BYTES);
 
             EntryType type = EntryType.of(info & TokenTreeBuilder.ENTRY_TYPE_MASK);
 
+            KeyOffsets rowOffsets = new KeyOffsets();
             switch (type)
             {
                 case SIMPLE:
-                    return new long[] { offsetData };
-
+                    rowOffsets.put(offsetData, KeyOffsets.NO_OFFSET);
+                    break;
                 case OVERFLOW:
-                    long[] offsets = new long[offsetExtra]; // offsetShort contains count of tokens
-                    long offsetPos = (buffer.position() + (2 * (leafSize * LONG_BYTES)) + (offsetData * LONG_BYTES));
+                    long offsetPos = (buffer.position() + (2 * (leafSize * Long.BYTES)) + (offsetData * Long.BYTES));
 
                     for (int i = 0; i < offsetExtra; i++)
-                        offsets[i] = buffer.getLong(offsetPos + (i * LONG_BYTES));
+                    {
+                        long offset = buffer.getLong(offsetPos + (i * Long.BYTES));;
+                        rowOffsets.put(offset, KeyOffsets.NO_OFFSET);
+                    }
+                    break;
+                case FACTORED:
+                    long offset = (((long) offsetData) << Short.SIZE) + offsetExtra;
+                    rowOffsets.put(offset, KeyOffsets.NO_OFFSET);
+                    break;
+                case PACKED:
+                    rowOffsets.put(offsetExtra, KeyOffsets.NO_OFFSET);
+                    rowOffsets.put(offsetData, KeyOffsets.NO_OFFSET);
+                default:
+                    throw new IllegalStateException("Unknown entry type: " + type);
+            }
+            return rowOffsets;
+        }
 
-                    return offsets;
+        private KeyOffsets fetchOffsets()
+        {
+            if (descriptor.version.compareTo(Version.ac) < 0)
+                return fetchOffsetsLegacy();
 
-                case FACTORED:
-                    return new long[] { (((long) offsetData) << Short.SIZE) + offsetExtra };
+            short info = buffer.getShort(position);
+            EntryType type = EntryType.of(info & TokenTreeBuilder.ENTRY_TYPE_MASK);
 
+            KeyOffsets rowOffsets = new KeyOffsets();
+            long baseOffset = position + LEAF_ENTRY_TYPE_BYTES + TOKEN_BYTES;
+            switch (type)
+            {
+                case SIMPLE:
+                    long partitionOffset = buffer.getLong(baseOffset);
+                    long rowOffset = buffer.getLong(baseOffset + LEAF_PARTITON_OFFSET_BYTES);
+
+                    rowOffsets.put(partitionOffset, rowOffset);
+                    break;
                 case PACKED:
-                    return new long[] { offsetExtra, offsetData };
+                    long partitionOffset1 = buffer.getInt(baseOffset);
+                    long rowOffset1 = buffer.getInt(baseOffset + LEAF_PARTITON_OFFSET_PACKED_BYTES);
+
+                    long partitionOffset2 = buffer.getInt(baseOffset + LEAF_PARTITON_OFFSET_PACKED_BYTES + LEAF_ROW_OFFSET_PACKED_BYTES);
+                    long rowOffset2 = buffer.getInt(baseOffset + 2 * LEAF_PARTITON_OFFSET_PACKED_BYTES + LEAF_ROW_OFFSET_PACKED_BYTES);
 
+                    rowOffsets.put(partitionOffset1, rowOffset1);
+                    rowOffsets.put(partitionOffset2, rowOffset2);
+                    break;
+                case OVERFLOW:
+                    long collisionOffset = buffer.getLong(baseOffset);
+                    long count = buffer.getLong(baseOffset + LEAF_PARTITON_OFFSET_BYTES);
+
+                    // Skip leaves and collision offsets that do not belong to current token
+                    long offsetPos = buffer.position() + leafSize *  LEAF_ENTRY_BYTES + collisionOffset * COLLISION_ENTRY_BYTES;
+
+                    for (int i = 0; i < count; i++)
+                    {
+                        long currentPartitionOffset = buffer.getLong(offsetPos + i * COLLISION_ENTRY_BYTES);
+                        long currentRowOffset = buffer.getLong(offsetPos + i * COLLISION_ENTRY_BYTES + LEAF_PARTITON_OFFSET_BYTES);
+
+                        rowOffsets.put(currentPartitionOffset, currentRowOffset);
+                    }
+                    break;
                 default:
                     throw new IllegalStateException("Unknown entry type: " + type);
             }
+
+
+            return rowOffsets;
         }
     }
 
-    private static class KeyIterator extends AbstractIterator<DecoratedKey>
+    private static class KeyIterator extends AbstractIterator<RowKey>
     {
-        private final Function<Long, DecoratedKey> keyFetcher;
-        private final long[] offsets;
-        private int index = 0;
+        private final KeyFetcher keyFetcher;
+        private final Iterator<LongObjectCursor<long[]>> offsets;
+        private long currentPatitionKey;
+        private PrimitiveIterator.OfLong currentCursor = null;
 
-        public KeyIterator(Function<Long, DecoratedKey> keyFetcher, long[] offsets)
+        public KeyIterator(KeyFetcher keyFetcher, KeyOffsets offsets)
         {
             this.keyFetcher = keyFetcher;
-            this.offsets = offsets;
+            this.offsets = offsets.iterator();
         }
 
-        public DecoratedKey computeNext()
+        public RowKey computeNext()
         {
-            return index < offsets.length ? keyFetcher.apply(offsets[index++]) : endOfData();
+            if (currentCursor != null && currentCursor.hasNext())
+            {
+                return keyFetcher.getRowKey(currentPatitionKey, currentCursor.nextLong());
+            }
+            else if (offsets.hasNext())
+            {
+                LongObjectCursor<long[]> cursor = offsets.next();
+                currentPatitionKey = cursor.key;
+                currentCursor = LongStream.of(cursor.value).iterator();
+
+                return keyFetcher.getRowKey(currentPatitionKey, currentCursor.nextLong());
+            }
+            else
+            {
+                return endOfData();
+            }
         }
     }
-}
\ No newline at end of file
+}