You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by xe...@apache.org on 2016/11/17 23:23:46 UTC

[1/3] cassandra git commit: Revert "Add row offset support to SASI"

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.X a1eef56cc -> 490c1c27c


http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/test/unit/org/apache/cassandra/index/sasi/disk/TokenTreeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/sasi/disk/TokenTreeTest.java b/test/unit/org/apache/cassandra/index/sasi/disk/TokenTreeTest.java
index 7c2498c..927e165 100644
--- a/test/unit/org/apache/cassandra/index/sasi/disk/TokenTreeTest.java
+++ b/test/unit/org/apache/cassandra/index/sasi/disk/TokenTreeTest.java
@@ -19,31 +19,42 @@ package org.apache.cassandra.index.sasi.disk;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.*;
 
 import com.google.common.collect.Iterators;
 import com.google.common.collect.PeekingIterator;
 
-import com.carrotsearch.hppc.cursors.LongObjectCursor;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.BufferDecoratedKey;
-import org.apache.cassandra.db.ClusteringComparator;
 import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.marshal.LongType;
 import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.index.sasi.disk.TokenTreeBuilder.EntryType;
-import org.apache.cassandra.index.sasi.utils.*;
-import org.apache.cassandra.io.util.*;
+import org.apache.cassandra.index.sasi.utils.CombinedTerm;
+import org.apache.cassandra.index.sasi.utils.CombinedValue;
+import org.apache.cassandra.index.sasi.utils.MappedBuffer;
+import org.apache.cassandra.index.sasi.utils.RangeIterator;
+import org.apache.cassandra.db.marshal.LongType;
+import org.apache.cassandra.index.sasi.utils.RangeUnionIterator;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.SequentialWriterOption;
+import org.apache.cassandra.utils.MurmurHash;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.io.util.SequentialWriter;
 
 import junit.framework.Assert;
 
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
+import com.carrotsearch.hppc.LongOpenHashSet;
+import com.carrotsearch.hppc.LongSet;
+import com.carrotsearch.hppc.cursors.LongCursor;
+import com.google.common.base.Function;
 
 public class TokenTreeTest
 {
-    private static final ClusteringComparator CLUSTERING_COMPARATOR = new ClusteringComparator(LongType.instance);
+    private static final Function<Long, DecoratedKey> KEY_CONVERTER = new KeyConverter();
 
     @BeforeClass
     public static void setupDD()
@@ -51,30 +62,14 @@ public class TokenTreeTest
         DatabaseDescriptor.daemonInitialization();
     }
 
-    static KeyOffsets singleOffset = new KeyOffsets() {{ put(1L, KeyOffsets.asArray(10L)); }};
-    static KeyOffsets bigSingleOffset = new KeyOffsets() {{ put(2147521562L, KeyOffsets.asArray(10)); }};
-    static KeyOffsets shortPackableCollision = new KeyOffsets() {{
-        put(2L, KeyOffsets.asArray(10));
-        put(3L, KeyOffsets.asArray(10));
-    }}; // can pack two shorts
-    static KeyOffsets intPackableCollision = new KeyOffsets()
-    {{
-        put(6L, KeyOffsets.asArray(10));
-        put(((long) Short.MAX_VALUE) + 1, KeyOffsets.asArray(10));
-    }}; // can pack int & short
-    static KeyOffsets multiCollision = new KeyOffsets()
-    {{
-        put(3L, KeyOffsets.asArray(10));
-        put(4L, KeyOffsets.asArray(10));
-        put(5L, KeyOffsets.asArray(10));
-    }}; // can't pack
-    static KeyOffsets unpackableCollision = new KeyOffsets()
-    {{
-        put(((long) Short.MAX_VALUE) + 1, KeyOffsets.asArray(10));
-        put(((long) Short.MAX_VALUE) + 2, KeyOffsets.asArray(10));
-    }}; // can't pack
+    static LongSet singleOffset = new LongOpenHashSet() {{ add(1); }};
+    static LongSet bigSingleOffset = new LongOpenHashSet() {{ add(2147521562L); }};
+    static LongSet shortPackableCollision = new LongOpenHashSet() {{ add(2L); add(3L); }}; // can pack two shorts
+    static LongSet intPackableCollision = new LongOpenHashSet() {{ add(6L); add(((long) Short.MAX_VALUE) + 1); }}; // can pack int & short
+    static LongSet multiCollision =  new LongOpenHashSet() {{ add(3L); add(4L); add(5L); }}; // can't pack
+    static LongSet unpackableCollision = new LongOpenHashSet() {{ add(((long) Short.MAX_VALUE) + 1); add(((long) Short.MAX_VALUE) + 2); }}; // can't pack
 
-    final static SortedMap<Long, KeyOffsets> simpleTokenMap = new TreeMap<Long, KeyOffsets>()
+    final static SortedMap<Long, LongSet> simpleTokenMap = new TreeMap<Long, LongSet>()
     {{
             put(1L, bigSingleOffset); put(3L, shortPackableCollision); put(4L, intPackableCollision); put(6L, singleOffset);
             put(9L, multiCollision); put(10L, unpackableCollision); put(12L, singleOffset); put(13L, singleOffset);
@@ -86,20 +81,18 @@ public class TokenTreeTest
             put(121L, singleOffset); put(122L, singleOffset); put(123L, singleOffset); put(125L, singleOffset);
     }};
 
-    final static SortedMap<Long, KeyOffsets> bigTokensMap = new TreeMap<Long, KeyOffsets>()
+    final static SortedMap<Long, LongSet> bigTokensMap = new TreeMap<Long, LongSet>()
     {{
             for (long i = 0; i < 1000000; i++)
                 put(i, singleOffset);
     }};
 
-    final static SortedMap<Long, KeyOffsets> collidingTokensMap = new TreeMap<Long, KeyOffsets>()
+    final static SortedMap<Long, LongSet> collidingTokensMap = new TreeMap<Long, LongSet>()
     {{
-        put(1L, singleOffset);
-        put(7L, singleOffset);
-        put(8L, singleOffset);
+            put(1L, singleOffset); put(7L, singleOffset); put(8L, singleOffset);
     }};
 
-    final static SortedMap<Long, KeyOffsets> tokens = bigTokensMap;
+    final static SortedMap<Long, LongSet> tokens = bigTokensMap;
 
     final static SequentialWriterOption DEFAULT_OPT = SequentialWriterOption.newBuilder().bufferSize(4096).build();
 
@@ -146,7 +139,7 @@ public class TokenTreeTest
     }
 
 
-    public void buildSerializeAndIterate(TokenTreeBuilder builder, SortedMap<Long, KeyOffsets> tokenMap) throws Exception
+    public void buildSerializeAndIterate(TokenTreeBuilder builder, SortedMap<Long, LongSet> tokenMap) throws Exception
     {
 
         builder.finish();
@@ -162,12 +155,12 @@ public class TokenTreeTest
         final RandomAccessReader reader = RandomAccessReader.open(treeFile);
         final TokenTree tokenTree = new TokenTree(new MappedBuffer(reader));
 
-        final Iterator<Token> tokenIterator = tokenTree.iterator(KeyConverter.instance);
-        final Iterator<Map.Entry<Long, KeyOffsets>> listIterator = tokenMap.entrySet().iterator();
+        final Iterator<Token> tokenIterator = tokenTree.iterator(KEY_CONVERTER);
+        final Iterator<Map.Entry<Long, LongSet>> listIterator = tokenMap.entrySet().iterator();
         while (tokenIterator.hasNext() && listIterator.hasNext())
         {
             Token treeNext = tokenIterator.next();
-            Map.Entry<Long, KeyOffsets> listNext = listIterator.next();
+            Map.Entry<Long, LongSet> listNext = listIterator.next();
 
             Assert.assertEquals(listNext.getKey(), treeNext.get());
             Assert.assertEquals(convert(listNext.getValue()), convert(treeNext));
@@ -200,15 +193,15 @@ public class TokenTreeTest
 
         for (long i = 0; i <= tokMax; i++)
         {
-            TokenTree.OnDiskToken result = tokenTree.get(i, KeyConverter.instance);
+            TokenTree.OnDiskToken result = tokenTree.get(i, KEY_CONVERTER);
             Assert.assertNotNull("failed to find object for token " + i, result);
 
-            KeyOffsets found = result.getOffsets();
+            LongSet found = result.getOffsets();
             Assert.assertEquals(1, found.size());
-            Assert.assertEquals(i, found.iterator().next().key);
+            Assert.assertEquals(i, found.toArray()[0]);
         }
 
-        Assert.assertNull("found missing object", tokenTree.get(tokMax + 10, KeyConverter.instance));
+        Assert.assertNull("found missing object", tokenTree.get(tokMax + 10, KEY_CONVERTER));
     }
 
     @Test
@@ -223,7 +216,7 @@ public class TokenTreeTest
         buildSerializeIterateAndSkip(new StaticTokenTreeBuilder(new FakeCombinedTerm(tokens)), tokens);
     }
 
-    public void buildSerializeIterateAndSkip(TokenTreeBuilder builder, SortedMap<Long, KeyOffsets> tokens) throws Exception
+    public void buildSerializeIterateAndSkip(TokenTreeBuilder builder, SortedMap<Long, LongSet> tokens) throws Exception
     {
         builder.finish();
         final File treeFile = File.createTempFile("token-tree-iterate-test2", "tt");
@@ -238,7 +231,7 @@ public class TokenTreeTest
         final RandomAccessReader reader = RandomAccessReader.open(treeFile);
         final TokenTree tokenTree = new TokenTree(new MappedBuffer(reader));
 
-        final RangeIterator<Long, Token> treeIterator = tokenTree.iterator(KeyConverter.instance);
+        final RangeIterator<Long, Token> treeIterator = tokenTree.iterator(KEY_CONVERTER);
         final RangeIterator<Long, TokenWithOffsets> listIterator = new EntrySetSkippableIterator(tokens);
 
         long lastToken = 0L;
@@ -282,7 +275,7 @@ public class TokenTreeTest
         skipPastEnd(new StaticTokenTreeBuilder(new FakeCombinedTerm(simpleTokenMap)), simpleTokenMap);
     }
 
-    public void skipPastEnd(TokenTreeBuilder builder, SortedMap<Long, KeyOffsets> tokens) throws Exception
+    public void skipPastEnd(TokenTreeBuilder builder, SortedMap<Long, LongSet> tokens) throws Exception
     {
         builder.finish();
         final File treeFile = File.createTempFile("token-tree-skip-past-test", "tt");
@@ -295,7 +288,7 @@ public class TokenTreeTest
         }
 
         final RandomAccessReader reader = RandomAccessReader.open(treeFile);
-        final RangeIterator<Long, Token> tokenTree = new TokenTree(new MappedBuffer(reader)).iterator(KeyConverter.instance);
+        final RangeIterator<Long, Token> tokenTree = new TokenTree(new MappedBuffer(reader)).iterator(KEY_CONVERTER);
 
         tokenTree.skipTo(tokens.lastKey() + 10);
     }
@@ -320,8 +313,8 @@ public class TokenTreeTest
         TokenTree treeA = generateTree(min, max, isStatic);
         TokenTree treeB = generateTree(min, max, isStatic);
 
-        RangeIterator<Long, Token> a = treeA.iterator(KeyConverter.instance);
-        RangeIterator<Long, Token> b = treeB.iterator(KeyConverter.instance);
+        RangeIterator<Long, Token> a = treeA.iterator(new KeyConverter());
+        RangeIterator<Long, Token> b = treeB.iterator(new KeyConverter());
 
         long count = min;
         while (a.hasNext() && b.hasNext())
@@ -339,8 +332,7 @@ public class TokenTreeTest
             // should fail when trying to merge different tokens
             try
             {
-                long l = tokenA.get();
-                tokenA.merge(new TokenWithOffsets(l + 1, convert(count)));
+                tokenA.merge(new TokenWithOffsets(tokenA.get() + 1, convert(count)));
                 Assert.fail();
             }
             catch (IllegalArgumentException e)
@@ -349,8 +341,8 @@ public class TokenTreeTest
             }
 
             final Set<Long> offsets = new TreeSet<>();
-            for (RowKey key : tokenA)
-                offsets.add(LongType.instance.compose(key.decoratedKey.getKey()));
+            for (DecoratedKey key : tokenA)
+                 offsets.add(LongType.instance.compose(key.getKey()));
 
             Set<Long> expected = new TreeSet<>();
             {
@@ -381,7 +373,7 @@ public class TokenTreeTest
         testMergingOfEqualTokenTrees(bigTokensMap);
     }
 
-    public void testMergingOfEqualTokenTrees(SortedMap<Long, KeyOffsets> tokensMap) throws Exception
+    public void testMergingOfEqualTokenTrees(SortedMap<Long, LongSet> tokensMap) throws Exception
     {
         TokenTreeBuilder tokensA = new DynamicTokenTreeBuilder(tokensMap);
         TokenTreeBuilder tokensB = new DynamicTokenTreeBuilder(tokensMap);
@@ -394,8 +386,8 @@ public class TokenTreeTest
             public RangeIterator<Long, Token> getTokenIterator()
             {
                 RangeIterator.Builder<Long, Token> union = RangeUnionIterator.builder();
-                union.add(a.iterator(KeyConverter.instance));
-                union.add(b.iterator(KeyConverter.instance));
+                union.add(a.iterator(new KeyConverter()));
+                union.add(b.iterator(new KeyConverter()));
 
                 return union.build();
             }
@@ -403,30 +395,31 @@ public class TokenTreeTest
 
         TokenTree c = buildTree(tokensC);
         Assert.assertEquals(tokensMap.size(), c.getCount());
-        Iterator<Token> tokenIterator = c.iterator(KeyConverter.instance);
-        Iterator<Map.Entry<Long, KeyOffsets>> listIterator = tokensMap.entrySet().iterator();
 
+        Iterator<Token> tokenIterator = c.iterator(KEY_CONVERTER);
+        Iterator<Map.Entry<Long, LongSet>> listIterator = tokensMap.entrySet().iterator();
         while (tokenIterator.hasNext() && listIterator.hasNext())
         {
             Token treeNext = tokenIterator.next();
-            Map.Entry<Long, KeyOffsets> listNext = listIterator.next();
+            Map.Entry<Long, LongSet> listNext = listIterator.next();
 
             Assert.assertEquals(listNext.getKey(), treeNext.get());
             Assert.assertEquals(convert(listNext.getValue()), convert(treeNext));
         }
 
-        for (Map.Entry<Long, KeyOffsets> entry : tokensMap.entrySet())
+        for (Map.Entry<Long, LongSet> entry : tokensMap.entrySet())
         {
-            TokenTree.OnDiskToken result = c.get(entry.getKey(), KeyConverter.instance);
+            TokenTree.OnDiskToken result = c.get(entry.getKey(), KEY_CONVERTER);
             Assert.assertNotNull("failed to find object for token " + entry.getKey(), result);
-            KeyOffsets found = result.getOffsets();
+
+            LongSet found = result.getOffsets();
             Assert.assertEquals(entry.getValue(), found);
 
         }
     }
 
 
-    private TokenTree buildTree(TokenTreeBuilder builder) throws Exception
+    private static TokenTree buildTree(TokenTreeBuilder builder) throws Exception
     {
         builder.finish();
         final File treeFile = File.createTempFile("token-tree-", "db");
@@ -444,9 +437,9 @@ public class TokenTreeTest
 
     private static class EntrySetSkippableIterator extends RangeIterator<Long, TokenWithOffsets>
     {
-        private final PeekingIterator<Map.Entry<Long, KeyOffsets>> elements;
+        private final PeekingIterator<Map.Entry<Long, LongSet>> elements;
 
-        EntrySetSkippableIterator(SortedMap<Long, KeyOffsets> elms)
+        EntrySetSkippableIterator(SortedMap<Long, LongSet> elms)
         {
             super(elms.firstKey(), elms.lastKey(), elms.size());
             elements = Iterators.peekingIterator(elms.entrySet().iterator());
@@ -458,7 +451,7 @@ public class TokenTreeTest
             if (!elements.hasNext())
                 return endOfData();
 
-            Map.Entry<Long, KeyOffsets> next = elements.next();
+            Map.Entry<Long, LongSet> next = elements.next();
             return new TokenWithOffsets(next.getKey(), next.getValue());
         }
 
@@ -485,9 +478,9 @@ public class TokenTreeTest
 
     public static class FakeCombinedTerm extends CombinedTerm
     {
-        private final SortedMap<Long, KeyOffsets> tokens;
+        private final SortedMap<Long, LongSet> tokens;
 
-        public FakeCombinedTerm(SortedMap<Long, KeyOffsets> tokens)
+        public FakeCombinedTerm(SortedMap<Long, LongSet> tokens)
         {
             super(null, null);
             this.tokens = tokens;
@@ -501,9 +494,9 @@ public class TokenTreeTest
 
     public static class TokenMapIterator extends RangeIterator<Long, Token>
     {
-        public final Iterator<Map.Entry<Long, KeyOffsets>> iterator;
+        public final Iterator<Map.Entry<Long, LongSet>> iterator;
 
-        public TokenMapIterator(SortedMap<Long, KeyOffsets> tokens)
+        public TokenMapIterator(SortedMap<Long, LongSet> tokens)
         {
             super(tokens.firstKey(), tokens.lastKey(), tokens.size());
             iterator = tokens.entrySet().iterator();
@@ -514,7 +507,7 @@ public class TokenTreeTest
             if (!iterator.hasNext())
                 return endOfData();
 
-            Map.Entry<Long, KeyOffsets> entry = iterator.next();
+            Map.Entry<Long, LongSet> entry = iterator.next();
             return new TokenWithOffsets(entry.getKey(), entry.getValue());
         }
 
@@ -531,16 +524,16 @@ public class TokenTreeTest
 
     public static class TokenWithOffsets extends Token
     {
-        private final KeyOffsets offsets;
+        private final LongSet offsets;
 
-        public TokenWithOffsets(Long token, final KeyOffsets offsets)
+        public TokenWithOffsets(long token, final LongSet offsets)
         {
             super(token);
             this.offsets = offsets;
         }
 
         @Override
-        public KeyOffsets getOffsets()
+        public LongSet getOffsets()
         {
             return offsets;
         }
@@ -578,56 +571,71 @@ public class TokenTreeTest
         }
 
         @Override
-        public Iterator<RowKey> iterator()
+        public Iterator<DecoratedKey> iterator()
         {
-            List<RowKey> keys = new ArrayList<>(offsets.size());
-            for (LongObjectCursor<long[]> offset : offsets)
-                for (long l : offset.value)
-                    keys.add(KeyConverter.instance.getRowKey(offset.key, l));
+            List<DecoratedKey> keys = new ArrayList<>(offsets.size());
+            for (LongCursor offset : offsets)
+                 keys.add(dk(offset.value));
+
             return keys.iterator();
         }
     }
 
-    private static Set<RowKey> convert(KeyOffsets offsets)
+    private static Set<DecoratedKey> convert(LongSet offsets)
     {
-        Set<RowKey> keys = new HashSet<>();
-        for (LongObjectCursor<long[]> offset : offsets)
-            for (long l : offset.value)
-                keys.add(new RowKey(KeyConverter.dk(offset.key),
-                                    KeyConverter.ck(l),
-                                    CLUSTERING_COMPARATOR));
+        Set<DecoratedKey> keys = new HashSet<>();
+        for (LongCursor offset : offsets)
+            keys.add(KEY_CONVERTER.apply(offset.value));
 
         return keys;
     }
 
-    private static Set<RowKey> convert(Token results)
+    private static Set<DecoratedKey> convert(Token results)
     {
-        Set<RowKey> keys = new HashSet<>();
-        for (RowKey key : results)
+        Set<DecoratedKey> keys = new HashSet<>();
+        for (DecoratedKey key : results)
             keys.add(key);
 
         return keys;
     }
 
-    private static KeyOffsets convert(long... values)
+    private static LongSet convert(long... values)
     {
-        KeyOffsets result = new KeyOffsets(values.length);
+        LongSet result = new LongOpenHashSet(values.length);
         for (long v : values)
-            result.put(v, KeyOffsets.asArray(v + 5));
+            result.add(v);
 
         return result;
     }
 
-    private TokenTree generateTree(final long minToken, final long maxToken, boolean isStatic) throws IOException
+    private static class KeyConverter implements Function<Long, DecoratedKey>
+    {
+        @Override
+        public DecoratedKey apply(Long offset)
+        {
+            return dk(offset);
+        }
+    }
+
+    private static DecoratedKey dk(Long token)
+    {
+        ByteBuffer buf = ByteBuffer.allocate(8);
+        buf.putLong(token);
+        buf.flip();
+        Long hashed = MurmurHash.hash2_64(buf, buf.position(), buf.remaining(), 0);
+        return new BufferDecoratedKey(new Murmur3Partitioner.LongToken(hashed), buf);
+    }
+
+    private static TokenTree generateTree(final long minToken, final long maxToken, boolean isStatic) throws IOException
     {
-        final SortedMap<Long, KeyOffsets> toks = new TreeMap<Long, KeyOffsets>()
+        final SortedMap<Long, LongSet> toks = new TreeMap<Long, LongSet>()
         {{
-            for (long i = minToken; i <= maxToken; i++)
-            {
-                KeyOffsets offsetSet = new KeyOffsets();
-                offsetSet.put(i, KeyOffsets.asArray(i + 5));
-                put(i, offsetSet);
-            }
+                for (long i = minToken; i <= maxToken; i++)
+                {
+                    LongSet offsetSet = new LongOpenHashSet();
+                    offsetSet.add(i);
+                    put(i, offsetSet);
+                }
         }};
 
         final TokenTreeBuilder builder = isStatic ? new StaticTokenTreeBuilder(new FakeCombinedTerm(toks)) : new DynamicTokenTreeBuilder(toks);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/test/unit/org/apache/cassandra/index/sasi/plan/OperationTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/sasi/plan/OperationTest.java b/test/unit/org/apache/cassandra/index/sasi/plan/OperationTest.java
index f89dd6c..e388cd4 100644
--- a/test/unit/org/apache/cassandra/index/sasi/plan/OperationTest.java
+++ b/test/unit/org/apache/cassandra/index/sasi/plan/OperationTest.java
@@ -47,7 +47,7 @@ import org.junit.*;
 
 public class OperationTest extends SchemaLoader
 {
-    private static final String KS_NAME = "operation_test";
+    private static final String KS_NAME = "sasi";
     private static final String CF_NAME = "test_cf";
     private static final String CLUSTERING_CF_NAME = "clustering_test_cf";
     private static final String STATIC_CF_NAME = "static_sasi_test_cf";

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/test/unit/org/apache/cassandra/index/sasi/utils/KeyConverter.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/sasi/utils/KeyConverter.java b/test/unit/org/apache/cassandra/index/sasi/utils/KeyConverter.java
deleted file mode 100644
index 7de502a..0000000
--- a/test/unit/org/apache/cassandra/index/sasi/utils/KeyConverter.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.index.sasi.utils;
-
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.marshal.*;
-import org.apache.cassandra.dht.*;
-import org.apache.cassandra.index.sasi.*;
-import org.apache.cassandra.index.sasi.disk.*;
-import org.apache.cassandra.utils.*;
-
-public class KeyConverter implements KeyFetcher
-{
-    public final static KeyConverter instance = new KeyConverter();
-
-    KeyConverter()
-    {}
-
-    @Override
-    public DecoratedKey getPartitionKey(long offset)
-    {
-        return dk(offset);
-    }
-
-    @Override
-    public Clustering getClustering(long offset)
-    {
-        return ck(offset);
-    }
-
-    @Override
-
-    public RowKey getRowKey(long partitionOffset, long rowOffset)
-    {
-        return new RowKey(getPartitionKey(partitionOffset), getClustering(rowOffset), new ClusteringComparator(LongType.instance));
-    }
-
-    public static DecoratedKey dk(long partitionOffset)
-    {
-        ByteBuffer buf = ByteBuffer.allocate(8);
-        buf.putLong(partitionOffset);
-        buf.flip();
-        Long hashed = MurmurHash.hash2_64(buf, buf.position(), buf.remaining(), 0);
-        return new BufferDecoratedKey(new Murmur3Partitioner.LongToken(hashed), buf);
-    }
-
-    public static Clustering ck(long offset)
-    {
-        return Clustering.make(ByteBufferUtil.bytes(offset));
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/test/unit/org/apache/cassandra/index/sasi/utils/LongIterator.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/sasi/utils/LongIterator.java b/test/unit/org/apache/cassandra/index/sasi/utils/LongIterator.java
index 4f80a1c..205d28f 100644
--- a/test/unit/org/apache/cassandra/index/sasi/utils/LongIterator.java
+++ b/test/unit/org/apache/cassandra/index/sasi/utils/LongIterator.java
@@ -26,8 +26,6 @@ import java.util.List;
 import com.carrotsearch.hppc.LongOpenHashSet;
 import com.carrotsearch.hppc.LongSet;
 import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.index.sasi.disk.KeyOffsets;
-import org.apache.cassandra.index.sasi.disk.RowKey;
 import org.apache.cassandra.index.sasi.disk.Token;
 
 public class LongIterator extends RangeIterator<Long, Token>
@@ -84,13 +82,13 @@ public class LongIterator extends RangeIterator<Long, Token>
         }
 
         @Override
-        public KeyOffsets getOffsets()
+        public LongSet getOffsets()
         {
-            return new KeyOffsets(4);
+            return new LongOpenHashSet(4);
         }
 
         @Override
-        public Iterator<RowKey> iterator()
+        public Iterator<DecoratedKey> iterator()
         {
             return Collections.emptyIterator();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/test/unit/org/apache/cassandra/index/sasi/utils/RangeUnionIteratorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/sasi/utils/RangeUnionIteratorTest.java b/test/unit/org/apache/cassandra/index/sasi/utils/RangeUnionIteratorTest.java
index 4819c0d..f69086b 100644
--- a/test/unit/org/apache/cassandra/index/sasi/utils/RangeUnionIteratorTest.java
+++ b/test/unit/org/apache/cassandra/index/sasi/utils/RangeUnionIteratorTest.java
@@ -31,23 +31,6 @@ import static org.apache.cassandra.index.sasi.utils.LongIterator.convert;
 public class RangeUnionIteratorTest
 {
     @Test
-    public void mergingOfEqualTokensTest()
-    {
-        RangeUnionIterator.Builder<Long, Token> builder = RangeUnionIterator.builder();
-
-        int size = 1000000;
-        final long[] arr = new long[size];
-        for (int i = 0; i < size; i++)
-            arr[i] = i;
-
-        builder.add(new LongIterator(arr));
-        builder.add(new LongIterator(arr));
-
-        Assert.assertEquals(convert(arr), convert(builder.build()));
-    }
-
-
-    @Test
     public void testNoOverlappingValues()
     {
         RangeUnionIterator.Builder<Long, Token> builder = RangeUnionIterator.builder();


[3/3] cassandra git commit: Revert "Add row offset support to SASI"

Posted by xe...@apache.org.
Revert "Add row offset support to SASI"

This reverts commit 7d857b46fb070548bf5e5f6ff81db588f08ec22a.


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/490c1c27
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/490c1c27
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/490c1c27

Branch: refs/heads/cassandra-3.X
Commit: 490c1c27c9b700f14212d9591a516ddb8d0865c7
Parents: a1eef56
Author: Pavel Yaskevich <xe...@apache.org>
Authored: Thu Nov 17 15:20:04 2016 -0800
Committer: Pavel Yaskevich <xe...@apache.org>
Committed: Thu Nov 17 15:20:04 2016 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 -
 .../org/apache/cassandra/db/ColumnIndex.java    |   6 +-
 .../apache/cassandra/index/sasi/KeyFetcher.java |  98 -------
 .../apache/cassandra/index/sasi/SASIIndex.java  |  11 +-
 .../cassandra/index/sasi/SASIIndexBuilder.java  |  13 +-
 .../cassandra/index/sasi/SSTableIndex.java      |  41 ++-
 .../cassandra/index/sasi/conf/ColumnIndex.java  |   4 +-
 .../index/sasi/conf/view/RangeTermTree.java     |   4 -
 .../sasi/disk/AbstractTokenTreeBuilder.java     | 276 ++++++++----------
 .../cassandra/index/sasi/disk/Descriptor.java   |  33 +--
 .../sasi/disk/DynamicTokenTreeBuilder.java      |  59 ++--
 .../cassandra/index/sasi/disk/KeyOffsets.java   | 115 --------
 .../cassandra/index/sasi/disk/OnDiskIndex.java  |  12 +-
 .../index/sasi/disk/OnDiskIndexBuilder.java     |  16 +-
 .../index/sasi/disk/PerSSTableIndexWriter.java  |  13 +-
 .../cassandra/index/sasi/disk/RowKey.java       | 108 -------
 .../index/sasi/disk/StaticTokenTreeBuilder.java |  18 +-
 .../apache/cassandra/index/sasi/disk/Token.java |   9 +-
 .../cassandra/index/sasi/disk/TokenTree.java    | 288 +++++++------------
 .../index/sasi/disk/TokenTreeBuilder.java       |  72 ++---
 .../index/sasi/memory/IndexMemtable.java        |   8 +-
 .../index/sasi/memory/KeyRangeIterator.java     |  49 ++--
 .../cassandra/index/sasi/memory/MemIndex.java   |   4 +-
 .../index/sasi/memory/SkipListMemIndex.java     |  12 +-
 .../index/sasi/memory/TrieMemIndex.java         |  45 +--
 .../index/sasi/plan/QueryController.java        |  49 ++--
 .../cassandra/index/sasi/plan/QueryPlan.java    | 174 +++--------
 .../io/sstable/format/SSTableFlushObserver.java |   5 -
 .../io/sstable/format/SSTableReader.java        |  33 +--
 .../io/sstable/format/big/BigTableWriter.java   |   8 +-
 .../org/apache/cassandra/utils/obs/BitUtil.java |   2 +-
 test/data/legacy-sasi/on-disk-sa-int2.db        | Bin 12312 -> 0 bytes
 .../cassandra/index/sasi/SASIIndexTest.java     |  25 +-
 .../index/sasi/disk/KeyOffsetsTest.java         |  48 ----
 .../index/sasi/disk/OnDiskIndexTest.java        | 216 +++++++-------
 .../sasi/disk/PerSSTableIndexWriterTest.java    | 112 ++------
 .../index/sasi/disk/TokenTreeTest.java          | 208 +++++++-------
 .../index/sasi/plan/OperationTest.java          |   2 +-
 .../index/sasi/utils/KeyConverter.java          |  69 -----
 .../index/sasi/utils/LongIterator.java          |   8 +-
 .../sasi/utils/RangeUnionIteratorTest.java      |  17 --
 41 files changed, 745 insertions(+), 1546 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index db06341..6ca26f9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -45,7 +45,6 @@
  * Allow filtering on partition key columns for queries without secondary indexes (CASSANDRA-11031)
  * Fix Cassandra Stress reporting thread model and precision (CASSANDRA-12585)
  * Add JMH benchmarks.jar (CASSANDRA-12586)
- * Add row offset support to SASI (CASSANDRA-11990)
  * Cleanup uses of AlterTableStatementColumn (CASSANDRA-12567)
  * Add keep-alive to streaming (CASSANDRA-11841)
  * Tracing payload is passed through newSession(..) (CASSANDRA-11706)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java
index 8ea1272..de1b1df 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -121,10 +121,9 @@ public class ColumnIndex
         {
             Row staticRow = iterator.staticRow();
 
-            long startPosition = currentPosition();
             UnfilteredSerializer.serializer.serializeStaticRow(staticRow, header, writer, version);
             if (!observers.isEmpty())
-                observers.forEach((o) -> o.nextUnfilteredCluster(staticRow, startPosition));
+                observers.forEach((o) -> o.nextUnfilteredCluster(staticRow));
         }
     }
 
@@ -235,7 +234,6 @@ public class ColumnIndex
 
     private void add(Unfiltered unfiltered) throws IOException
     {
-        final long origPos = writer.position();
         long pos = currentPosition();
 
         if (firstClustering == null)
@@ -249,7 +247,7 @@ public class ColumnIndex
 
         // notify observers about each new row
         if (!observers.isEmpty())
-            observers.forEach((o) -> o.nextUnfilteredCluster(unfiltered, origPos));
+            observers.forEach((o) -> o.nextUnfilteredCluster(unfiltered));
 
         lastClustering = unfiltered.clustering();
         previousRowStart = pos;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/KeyFetcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/KeyFetcher.java b/src/java/org/apache/cassandra/index/sasi/KeyFetcher.java
deleted file mode 100644
index 80ee167..0000000
--- a/src/java/org/apache/cassandra/index/sasi/KeyFetcher.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.index.sasi;
-
-import java.io.IOException;
-
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.index.sasi.disk.*;
-import org.apache.cassandra.io.*;
-import org.apache.cassandra.io.sstable.format.*;
-
-
-public interface KeyFetcher
-{
-    public Clustering getClustering(long offset);
-    public DecoratedKey getPartitionKey(long offset);
-
-    public RowKey getRowKey(long partitionOffset, long rowOffset);
-
-    /**
-     * Fetches clustering and partition key from the sstable.
-     *
-     * Currently, clustering key is fetched from the data file of the sstable and partition key is
-     * read from the index file. Reading from index file helps us to warm up key cache in this case.
-     */
-    public static class SSTableKeyFetcher implements KeyFetcher
-    {
-        private final SSTableReader sstable;
-
-        public SSTableKeyFetcher(SSTableReader reader)
-        {
-            sstable = reader;
-        }
-
-        @Override
-        public Clustering getClustering(long offset)
-        {
-            try
-            {
-                return sstable.clusteringAt(offset);
-            }
-            catch (IOException e)
-            {
-                throw new FSReadError(new IOException("Failed to read clustering from " + sstable.descriptor, e), sstable.getFilename());
-            }
-        }
-
-        @Override
-        public DecoratedKey getPartitionKey(long offset)
-        {
-            try
-            {
-                return sstable.keyAt(offset);
-            }
-            catch (IOException e)
-            {
-                throw new FSReadError(new IOException("Failed to read key from " + sstable.descriptor, e), sstable.getFilename());
-            }
-        }
-
-        @Override
-        public RowKey getRowKey(long partitionOffset, long rowOffset)
-        {
-            if (rowOffset == KeyOffsets.NO_OFFSET)
-                return new RowKey(getPartitionKey(partitionOffset), null, sstable.metadata.comparator);
-            else
-                return new RowKey(getPartitionKey(partitionOffset), getClustering(rowOffset), sstable.metadata.comparator);
-        }
-
-        public int hashCode()
-        {
-            return sstable.descriptor.hashCode();
-        }
-
-        public boolean equals(Object other)
-        {
-            return other instanceof SSTableKeyFetcher
-                   && sstable.descriptor.equals(((SSTableKeyFetcher) other).sstable.descriptor);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/SASIIndex.java b/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
index 65953a9..4375964 100644
--- a/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
+++ b/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
@@ -46,7 +46,6 @@ import org.apache.cassandra.index.sasi.conf.ColumnIndex;
 import org.apache.cassandra.index.sasi.conf.IndexMode;
 import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder.Mode;
 import org.apache.cassandra.index.sasi.disk.PerSSTableIndexWriter;
-import org.apache.cassandra.index.sasi.disk.RowKey;
 import org.apache.cassandra.index.sasi.plan.QueryPlan;
 import org.apache.cassandra.index.transactions.IndexTransaction;
 import org.apache.cassandra.io.sstable.Descriptor;
@@ -183,14 +182,6 @@ public class SASIIndex implements Index, INotificationConsumer
         return getTruncateTask(FBUtilities.timestampMicros());
     }
 
-    public Callable<?> getTruncateTask(Collection<SSTableReader> sstablesToRebuild)
-    {
-        return () -> {
-            index.dropData(sstablesToRebuild);
-            return null;
-        };
-    }
-
     public Callable<?> getTruncateTask(long truncatedAt)
     {
         return () -> {
@@ -261,7 +252,7 @@ public class SASIIndex implements Index, INotificationConsumer
             public void insertRow(Row row)
             {
                 if (isNewData())
-                    adjustMemtableSize(index.index(new RowKey(key, row.clustering(), baseCfs.getComparator()), row), opGroup);
+                    adjustMemtableSize(index.index(key, row), opGroup);
             }
 
             public void updateRow(Row oldRow, Row newRow)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java b/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java
index d6706ea..d50875a 100644
--- a/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java
+++ b/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java
@@ -94,23 +94,16 @@ class SASIIndexBuilder extends SecondaryIndexBuilder
                         {
                             RowIndexEntry indexEntry = sstable.getPosition(key, SSTableReader.Operator.EQ);
                             dataFile.seek(indexEntry.position);
-                            int staticOffset = ByteBufferUtil.readWithShortLength(dataFile).remaining(); // key
+                            ByteBufferUtil.readWithShortLength(dataFile); // key
 
                             try (SSTableIdentityIterator partition = SSTableIdentityIterator.create(sstable, dataFile, key))
                             {
                                 // if the row has statics attached, it has to be indexed separately
                                 if (cfs.metadata.hasStaticColumns())
-                                {
-                                    long staticPosition = indexEntry.position + staticOffset;
-                                    indexWriter.nextUnfilteredCluster(partition.staticRow(), staticPosition);
-                                }
+                                    indexWriter.nextUnfilteredCluster(partition.staticRow());
 
-                                long position = dataFile.getPosition();
                                 while (partition.hasNext())
-                                {
-                                    indexWriter.nextUnfilteredCluster(partition.next(), position);
-                                    position = dataFile.getPosition();
-                                }
+                                    indexWriter.nextUnfilteredCluster(partition.next());
                             }
                         }
                         catch (IOException ex)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/SSTableIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/SSTableIndex.java b/src/java/org/apache/cassandra/index/sasi/SSTableIndex.java
index f9c8abf..c67c39c 100644
--- a/src/java/org/apache/cassandra/index/sasi/SSTableIndex.java
+++ b/src/java/org/apache/cassandra/index/sasi/SSTableIndex.java
@@ -18,22 +18,28 @@
 package org.apache.cassandra.index.sasi;
 
 import java.io.File;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.index.sasi.conf.ColumnIndex;
-import org.apache.cassandra.index.sasi.disk.*;
+import org.apache.cassandra.index.sasi.disk.OnDiskIndex;
+import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder;
 import org.apache.cassandra.index.sasi.disk.Token;
 import org.apache.cassandra.index.sasi.plan.Expression;
 import org.apache.cassandra.index.sasi.utils.RangeIterator;
+import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.concurrent.Ref;
 
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 
+import com.google.common.base.Function;
+
 public class SSTableIndex
 {
     private final ColumnIndex columnIndex;
@@ -59,7 +65,7 @@ public class SSTableIndex
                 sstable.getFilename(),
                 columnIndex.getIndexName());
 
-        this.index = new OnDiskIndex(indexFile, validator, new KeyFetcher.SSTableKeyFetcher(sstable));
+        this.index = new OnDiskIndex(indexFile, validator, new DecoratedKeyFetcher(sstable));
     }
 
     public OnDiskIndexBuilder.Mode mode()
@@ -157,5 +163,36 @@ public class SSTableIndex
         return String.format("SSTableIndex(column: %s, SSTable: %s)", columnIndex.getColumnName(), sstable.descriptor);
     }
 
+    private static class DecoratedKeyFetcher implements Function<Long, DecoratedKey>
+    {
+        private final SSTableReader sstable;
+
+        DecoratedKeyFetcher(SSTableReader reader)
+        {
+            sstable = reader;
+        }
+
+        public DecoratedKey apply(Long offset)
+        {
+            try
+            {
+                return sstable.keyAt(offset);
+            }
+            catch (IOException e)
+            {
+                throw new FSReadError(new IOException("Failed to read key from " + sstable.descriptor, e), sstable.getFilename());
+            }
+        }
+
+        public int hashCode()
+        {
+            return sstable.descriptor.hashCode();
+        }
 
+        public boolean equals(Object other)
+        {
+            return other instanceof DecoratedKeyFetcher
+                    && sstable.descriptor.equals(((DecoratedKeyFetcher) other).sstable.descriptor);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java b/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java
index 459e5c3..0958113 100644
--- a/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java
@@ -30,6 +30,7 @@ import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.Operator;
+import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Memtable;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.AsciiType;
@@ -39,7 +40,6 @@ import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.index.sasi.analyzer.AbstractAnalyzer;
 import org.apache.cassandra.index.sasi.conf.view.View;
 import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder;
-import org.apache.cassandra.index.sasi.disk.RowKey;
 import org.apache.cassandra.index.sasi.disk.Token;
 import org.apache.cassandra.index.sasi.memory.IndexMemtable;
 import org.apache.cassandra.index.sasi.plan.Expression;
@@ -99,7 +99,7 @@ public class ColumnIndex
         return keyValidator;
     }
 
-    public long index(RowKey key, Row row)
+    public long index(DecoratedKey key, Row row)
     {
         return getCurrentMemtable().index(key, getValueOf(column, row, FBUtilities.nowInSeconds()));
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/conf/view/RangeTermTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/conf/view/RangeTermTree.java b/src/java/org/apache/cassandra/index/sasi/conf/view/RangeTermTree.java
index 2775c29..d6b4551 100644
--- a/src/java/org/apache/cassandra/index/sasi/conf/view/RangeTermTree.java
+++ b/src/java/org/apache/cassandra/index/sasi/conf/view/RangeTermTree.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.index.sasi.conf.view;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -47,9 +46,6 @@ public class RangeTermTree implements TermTree
 
     public Set<SSTableIndex> search(Expression e)
     {
-        if (e == null)
-            return Collections.emptySet();
-
         ByteBuffer minTerm = e.lower == null ? min : e.lower.value;
         ByteBuffer maxTerm = e.upper == null ? max : e.upper.value;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/disk/AbstractTokenTreeBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/AbstractTokenTreeBuilder.java b/src/java/org/apache/cassandra/index/sasi/disk/AbstractTokenTreeBuilder.java
index 9245960..18994de 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/AbstractTokenTreeBuilder.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/AbstractTokenTreeBuilder.java
@@ -20,18 +20,19 @@ package org.apache.cassandra.index.sasi.disk;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.function.*;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
 
-import com.carrotsearch.hppc.LongArrayList;
-import com.carrotsearch.hppc.cursors.LongCursor;
-import com.carrotsearch.hppc.cursors.LongObjectCursor;
-import org.apache.cassandra.dht.*;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.AbstractIterator;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 
+import com.carrotsearch.hppc.LongArrayList;
+import com.carrotsearch.hppc.LongSet;
+import com.carrotsearch.hppc.cursors.LongCursor;
+
 public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
 {
     protected int numBlocks;
@@ -64,7 +65,7 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
     public int serializedSize()
     {
         if (numBlocks == 1)
-            return BLOCK_HEADER_BYTES + ((int) tokenCount * LEAF_ENTRY_BYTES);
+            return (BLOCK_HEADER_BYTES + ((int) tokenCount * 16));
         else
             return numBlocks * BLOCK_BYTES;
     }
@@ -111,15 +112,6 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
         buffer.clear();
     }
 
-    /**
-     * Tree node,
-     *
-     * B+-tree consists of root, interior nodes and leaves. Root can be either a node or a leaf.
-     *
-     * Depending on the concrete implementation of {@code TokenTreeBuilder}
-     * leaf can be partial or static (in case of {@code StaticTokenTreeBuilder} or dynamic in case
-     * of {@code DynamicTokenTreeBuilder}
-     */
     protected abstract class Node
     {
         protected InteriorNode parent;
@@ -187,16 +179,8 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
             alignBuffer(buf, BLOCK_HEADER_BYTES);
         }
 
-        /**
-         * Shared header part, written for all node types:
-         *     [  info byte  ] [  token count   ] [ min node token ] [ max node token ]
-         *     [      1b     ] [    2b (short)  ] [   8b (long)    ] [    8b (long)   ]
-         **/
         private abstract class Header
         {
-            /**
-             * Serializes the shared part of the header
-             */
             public void serialize(ByteBuffer buf)
             {
                 buf.put(infoByte())
@@ -208,12 +192,6 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
             protected abstract byte infoByte();
         }
 
-        /**
-         * In addition to shared header part, root header stores version information,
-         * overall token count and min/max tokens for the whole tree:
-         *     [      magic    ] [  overall token count  ] [ min tree token ] [ max tree token ]
-         *     [   2b (short)  ] [         8b (long)     ] [   8b (long)    ] [    8b (long)   ]
-         */
         private class RootHeader extends Header
         {
             public void serialize(ByteBuffer buf)
@@ -229,21 +207,19 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
             {
                 // if leaf, set leaf indicator and last leaf indicator (bits 0 & 1)
                 // if not leaf, clear both bits
-                return isLeaf() ? ENTRY_TYPE_MASK : 0;
+                return (byte) ((isLeaf()) ? 3 : 0);
             }
 
             protected void writeMagic(ByteBuffer buf)
             {
                 switch (Descriptor.CURRENT_VERSION)
                 {
-                    case ab:
+                    case Descriptor.VERSION_AB:
                         buf.putShort(AB_MAGIC);
                         break;
-                    case ac:
-                        buf.putShort(AC_MAGIC);
-                        break;
+
                     default:
-                        throw new RuntimeException("Unsupported version");
+                        break;
                 }
 
             }
@@ -273,12 +249,6 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
 
     }
 
-    /**
-     * Leaf consists of
-     *   - header (format described in {@code Header} )
-     *   - data (format described in {@code LeafEntry})
-     *   - overflow collision entries, that hold {@value OVERFLOW_TRAILER_CAPACITY} of {@code RowOffset}.
-     */
     protected abstract class Leaf extends Node
     {
         protected LongArrayList overflowCollisions;
@@ -309,98 +279,82 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
 
         protected abstract void serializeData(ByteBuffer buf);
 
-        protected LeafEntry createEntry(final long tok, final KeyOffsets offsets)
+        protected LeafEntry createEntry(final long tok, final LongSet offsets)
         {
-            LongArrayList rawOffsets = new LongArrayList(offsets.size());
-
-            offsets.forEach(new Consumer<LongObjectCursor<long[]>>()
-            {
-                public void accept(LongObjectCursor<long[]> cursor)
-                {
-                    for (long l : cursor.value)
-                    {
-                        rawOffsets.add(cursor.key);
-                        rawOffsets.add(l);
-                    }
-                }
-            });
-
-            int offsetCount = rawOffsets.size();
+            int offsetCount = offsets.size();
             switch (offsetCount)
             {
                 case 0:
                     throw new AssertionError("no offsets for token " + tok);
+                case 1:
+                    long offset = offsets.toArray()[0];
+                    if (offset > MAX_OFFSET)
+                        throw new AssertionError("offset " + offset + " cannot be greater than " + MAX_OFFSET);
+                    else if (offset <= Integer.MAX_VALUE)
+                        return new SimpleLeafEntry(tok, offset);
+                    else
+                        return new FactoredOffsetLeafEntry(tok, offset);
                 case 2:
-                    return new SimpleLeafEntry(tok, rawOffsets.get(0), rawOffsets.get(1));
+                    long[] rawOffsets = offsets.toArray();
+                    if (rawOffsets[0] <= Integer.MAX_VALUE && rawOffsets[1] <= Integer.MAX_VALUE &&
+                        (rawOffsets[0] <= Short.MAX_VALUE || rawOffsets[1] <= Short.MAX_VALUE))
+                        return new PackedCollisionLeafEntry(tok, rawOffsets);
+                    else
+                        return createOverflowEntry(tok, offsetCount, offsets);
                 default:
-                    assert offsetCount % 2 == 0;
-                    if (offsetCount == 4)
-                    {
-                        if (rawOffsets.get(0) < Integer.MAX_VALUE && rawOffsets.get(1) < Integer.MAX_VALUE &&
-                            rawOffsets.get(2) < Integer.MAX_VALUE && rawOffsets.get(3) < Integer.MAX_VALUE)
-                        {
-                            return new PackedCollisionLeafEntry(tok, (int)rawOffsets.get(0), (int) rawOffsets.get(1),
-                                                                (int) rawOffsets.get(2), (int) rawOffsets.get(3));
-                        }
-                    }
-                    return createOverflowEntry(tok, offsetCount, rawOffsets);
+                    return createOverflowEntry(tok, offsetCount, offsets);
             }
         }
 
-        private LeafEntry createOverflowEntry(final long tok, final int offsetCount, final LongArrayList offsets)
+        private LeafEntry createOverflowEntry(final long tok, final int offsetCount, final LongSet offsets)
         {
             if (overflowCollisions == null)
-                overflowCollisions = new LongArrayList(offsetCount);
-
-            int overflowCount = (overflowCollisions.size() + offsetCount) / 2;
-            if (overflowCount >= OVERFLOW_TRAILER_CAPACITY)
-                throw new AssertionError("cannot have more than " + OVERFLOW_TRAILER_CAPACITY + " overflow collisions per leaf, but had: " + overflowCount);
+                overflowCollisions = new LongArrayList();
 
-            LeafEntry entry = new OverflowCollisionLeafEntry(tok, (short) (overflowCollisions.size() / 2), (short) (offsetCount / 2));
-            overflowCollisions.addAll(offsets);
+            LeafEntry entry = new OverflowCollisionLeafEntry(tok, (short) overflowCollisions.size(), (short) offsetCount);
+            for (LongCursor o : offsets)
+            {
+                if (overflowCollisions.size() == OVERFLOW_TRAILER_CAPACITY)
+                    throw new AssertionError("cannot have more than " + OVERFLOW_TRAILER_CAPACITY + " overflow collisions per leaf");
+                else
+                    overflowCollisions.add(o.value);
+            }
             return entry;
         }
 
-        /**
-         * A leaf of the B+-Tree, that holds information about the row offset(s) for
-         * the current token.
-         *
-         * Main 3 types of leaf entries are:
-         *   1) simple leaf entry: holding just a single row offset
-         *   2) packed collision leaf entry: holding two entries that would fit together into 168 bytes
-         *   3) overflow entry: only holds offset in overflow trailer and amount of entries belonging to this leaf
-         */
         protected abstract class LeafEntry
         {
             protected final long token;
 
             abstract public EntryType type();
+            abstract public int offsetData();
+            abstract public short offsetExtra();
 
             public LeafEntry(final long tok)
             {
                 token = tok;
             }
 
-            public abstract void serialize(ByteBuffer buf);
+            public void serialize(ByteBuffer buf)
+            {
+                buf.putShort((short) type().ordinal())
+                   .putShort(offsetExtra())
+                   .putLong(token)
+                   .putInt(offsetData());
+            }
 
         }
 
-        /**
-         * Simple leaf, that can store a single row offset, having the following format:
-         *
-         *     [    type    ] [   token   ] [ partition offset ] [ row offset ]
-         *     [ 2b (short) ] [ 8b (long) ] [     8b (long)    ] [  8b (long) ]
-         */
+
+        // assumes there is a single offset and the offset is <= Integer.MAX_VALUE
         protected class SimpleLeafEntry extends LeafEntry
         {
-            private final long partitionOffset;
-            private final long rowOffset;
+            private final long offset;
 
-            public SimpleLeafEntry(final long tok, final long partitionOffset, final long rowOffset)
+            public SimpleLeafEntry(final long tok, final long off)
             {
                 super(tok);
-                this.partitionOffset = partitionOffset;
-                this.rowOffset = rowOffset;
+                offset = off;
             }
 
             public EntryType type()
@@ -408,38 +362,61 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
                 return EntryType.SIMPLE;
             }
 
-            @Override
-            public void serialize(ByteBuffer buf)
+            public int offsetData()
             {
-                buf.putShort((short) type().ordinal())
-                   .putLong(token)
-                   .putLong(partitionOffset)
-                   .putLong(rowOffset);
+                return (int) offset;
+            }
+
+            public short offsetExtra()
+            {
+                return 0;
             }
         }
 
-        /**
-         * Packed collision entry, can store two offsets, if each one of their positions
-         * fit into 4 bytes.
-         *     [    type    ] [   token   ] [ partition offset 1 ] [ row offset  1] [ partition offset 1 ] [ row offset  1]
-         *     [ 2b (short) ] [ 8b (long) ] [      4b (int)      ] [    4b (int)  ] [      4b (int)      ] [    4b (int)  ]
-         */
-        protected class PackedCollisionLeafEntry extends LeafEntry
+        // assumes there is a single offset and Integer.MAX_VALUE < offset <= MAX_OFFSET
+        // take the middle 32 bits of offset (or the top 32 when considering offset is max 48 bits)
+        // and store where offset is normally stored. take bottom 16 bits of offset and store in entry header
+        private class FactoredOffsetLeafEntry extends LeafEntry
         {
-            private final int partitionOffset1;
-            private final int rowOffset1;
-            private final int partitionOffset2;
-            private final int rowOffset2;
+            private final long offset;
 
-            public PackedCollisionLeafEntry(final long tok, final int partitionOffset1, final int rowOffset1,
-                                            final int partitionOffset2, final int rowOffset2)
+            public FactoredOffsetLeafEntry(final long tok, final long off)
             {
                 super(tok);
-                this.partitionOffset1 = partitionOffset1;
-                this.rowOffset1 = rowOffset1;
-                this.partitionOffset2 = partitionOffset2;
-                this.rowOffset2 = rowOffset2;
+                offset = off;
+            }
 
+            public EntryType type()
+            {
+                return EntryType.FACTORED;
+            }
+
+            public int offsetData()
+            {
+                return (int) (offset >>> Short.SIZE);
+            }
+
+            public short offsetExtra()
+            {
+                // exta offset is supposed to be an unsigned 16-bit integer
+                return (short) offset;
+            }
+        }
+
+        // holds an entry with two offsets that can be packed in an int & a short
+        // the int offset is stored where offset is normally stored. short offset is
+        // stored in entry header
+        private class PackedCollisionLeafEntry extends LeafEntry
+        {
+            private short smallerOffset;
+            private int largerOffset;
+
+            public PackedCollisionLeafEntry(final long tok, final long[] offs)
+            {
+                super(tok);
+
+                smallerOffset = (short) Math.min(offs[0], offs[1]);
+                largerOffset = (int) Math.max(offs[0], offs[1]);
             }
 
             public EntryType type()
@@ -447,27 +424,21 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
                 return EntryType.PACKED;
             }
 
-            @Override
-            public void serialize(ByteBuffer buf)
+            public int offsetData()
             {
-                buf.putShort((short) type().ordinal())
-                   .putLong(token)
-                   .putInt(partitionOffset1)
-                   .putInt(rowOffset1)
-                   .putInt(partitionOffset2)
-                   .putInt(rowOffset2);
-            }
-        }
-
-        /**
-         * Overflow collision entry, holds an entry with three or more offsets, or two offsets
-         * that cannot be packed into 16 bytes.
-         *     [    type    ] [   token   ] [ start index ] [    count    ]
-         *     [ 2b (short) ] [ 8b (long) ] [   8b (long) ] [  8b (long)  ]
-         *
-         *   - [ start index ] is a position of first item belonging to this leaf entry in the overflow trailer
-         *   - [ count ] is the amount of items belonging to this leaf entry that are stored in the overflow trailer
-         */
+                return largerOffset;
+            }
+
+            public short offsetExtra()
+            {
+                return smallerOffset;
+            }
+        }
+
+        // holds an entry with three or more offsets, or two offsets that cannot
+        // be packed into an int & a short. the index into the overflow list
+        // is stored where the offset is normally stored. the number of overflowed offsets
+        // for the entry is stored in the entry header
         private class OverflowCollisionLeafEntry extends LeafEntry
         {
             private final short startIndex;
@@ -485,23 +456,20 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
                 return EntryType.OVERFLOW;
             }
 
-            @Override
-            public void serialize(ByteBuffer buf)
+            public int offsetData()
             {
-                buf.putShort((short) type().ordinal())
-                   .putLong(token)
-                   .putLong(startIndex)
-                   .putLong(count);
+                return startIndex;
             }
+
+            public short offsetExtra()
+            {
+                return count;
+            }
+
         }
+
     }
 
-    /**
-     * Interior node consists of:
-     *    - (interior node) header
-     *    - tokens (serialized as longs, with count stored in header)
-     *    - child offsets
-     */
     protected class InteriorNode extends Node
     {
         protected List<Long> tokens = new ArrayList<>(TOKENS_PER_BLOCK);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/disk/Descriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/Descriptor.java b/src/java/org/apache/cassandra/index/sasi/disk/Descriptor.java
index 3fa0f06..3aa6f14 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/Descriptor.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/Descriptor.java
@@ -22,29 +22,30 @@ package org.apache.cassandra.index.sasi.disk;
  */
 public class Descriptor
 {
-    public static enum Version
+    public static final String VERSION_AA = "aa";
+    public static final String VERSION_AB = "ab";
+    public static final String CURRENT_VERSION = VERSION_AB;
+    public static final Descriptor CURRENT = new Descriptor(CURRENT_VERSION);
+
+    public static class Version
     {
-        aa,
-        ab,
-        ac
-    }
+        public final String version;
 
-    public static final Version VERSION_AA = Version.aa;
-    public static final Version VERSION_AB = Version.ab;
-    public static final Version VERSION_AC = Version.ac;
+        public Version(String version)
+        {
+            this.version = version;
+        }
 
-    public static final Version CURRENT_VERSION = Version.ac;
-    public static final Descriptor CURRENT = new Descriptor(CURRENT_VERSION);
+        public String toString()
+        {
+            return version;
+        }
+    }
 
     public final Version version;
 
     public Descriptor(String v)
     {
-        this.version = Version.valueOf(v);
-    }
-
-    public Descriptor(Version v)
-    {
-        this.version = v;
+        this.version = new Version(v);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/disk/DynamicTokenTreeBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/DynamicTokenTreeBuilder.java b/src/java/org/apache/cassandra/index/sasi/disk/DynamicTokenTreeBuilder.java
index 6e3e163..2ddfd89 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/DynamicTokenTreeBuilder.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/DynamicTokenTreeBuilder.java
@@ -20,14 +20,17 @@ package org.apache.cassandra.index.sasi.disk;
 import java.nio.ByteBuffer;
 import java.util.*;
 
-import com.carrotsearch.hppc.cursors.LongObjectCursor;
-import org.apache.cassandra.dht.*;
-import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.AbstractIterator;
+import org.apache.cassandra.utils.Pair;
+
+import com.carrotsearch.hppc.LongOpenHashSet;
+import com.carrotsearch.hppc.LongSet;
+import com.carrotsearch.hppc.cursors.LongCursor;
 
 public class DynamicTokenTreeBuilder extends AbstractTokenTreeBuilder
 {
+    private final SortedMap<Long, LongSet> tokens = new TreeMap<>();
 
-    private final SortedMap<Long, KeyOffsets> tokens = new TreeMap<>();
 
     public DynamicTokenTreeBuilder()
     {}
@@ -37,52 +40,54 @@ public class DynamicTokenTreeBuilder extends AbstractTokenTreeBuilder
         add(data);
     }
 
-    public DynamicTokenTreeBuilder(SortedMap<Long, KeyOffsets> data)
+    public DynamicTokenTreeBuilder(SortedMap<Long, LongSet> data)
     {
         add(data);
     }
 
-    public void add(Long token, long partitionOffset, long rowOffset)
+    public void add(Long token, long keyPosition)
     {
-        KeyOffsets found = tokens.get(token);
+        LongSet found = tokens.get(token);
         if (found == null)
-            tokens.put(token, (found = new KeyOffsets(2)));
+            tokens.put(token, (found = new LongOpenHashSet(2)));
 
-        found.put(partitionOffset, rowOffset);
+        found.add(keyPosition);
     }
 
-    public void add(Iterator<Pair<Long, KeyOffsets>> data)
+    public void add(Iterator<Pair<Long, LongSet>> data)
     {
         while (data.hasNext())
         {
-            Pair<Long, KeyOffsets> entry = data.next();
-            for (LongObjectCursor<long[]> cursor : entry.right)
-                for (long l : cursor.value)
-                    add(entry.left, cursor.key, l);
+            Pair<Long, LongSet> entry = data.next();
+            for (LongCursor l : entry.right)
+                add(entry.left, l.value);
         }
     }
 
-    public void add(SortedMap<Long, KeyOffsets> data)
+    public void add(SortedMap<Long, LongSet> data)
     {
-        for (Map.Entry<Long, KeyOffsets> newEntry : data.entrySet())
+        for (Map.Entry<Long, LongSet> newEntry : data.entrySet())
         {
-            for (LongObjectCursor<long[]> cursor : newEntry.getValue())
-                for (long l : cursor.value)
-                    add(newEntry.getKey(), cursor.key, l);
+            LongSet found = tokens.get(newEntry.getKey());
+            if (found == null)
+                tokens.put(newEntry.getKey(), (found = new LongOpenHashSet(4)));
+
+            for (LongCursor offset : newEntry.getValue())
+                found.add(offset.value);
         }
     }
 
-    public Iterator<Pair<Long, KeyOffsets>> iterator()
+    public Iterator<Pair<Long, LongSet>> iterator()
     {
-        final Iterator<Map.Entry<Long, KeyOffsets>> iterator = tokens.entrySet().iterator();
-        return new AbstractIterator<Pair<Long, KeyOffsets>>()
+        final Iterator<Map.Entry<Long, LongSet>> iterator = tokens.entrySet().iterator();
+        return new AbstractIterator<Pair<Long, LongSet>>()
         {
-            protected Pair<Long, KeyOffsets> computeNext()
+            protected Pair<Long, LongSet> computeNext()
             {
                 if (!iterator.hasNext())
                     return endOfData();
 
-                Map.Entry<Long, KeyOffsets> entry = iterator.next();
+                Map.Entry<Long, LongSet> entry = iterator.next();
                 return Pair.create(entry.getKey(), entry.getValue());
             }
         };
@@ -156,9 +161,9 @@ public class DynamicTokenTreeBuilder extends AbstractTokenTreeBuilder
 
     private class DynamicLeaf extends Leaf
     {
-        private final SortedMap<Long, KeyOffsets> tokens;
+        private final SortedMap<Long, LongSet> tokens;
 
-        DynamicLeaf(SortedMap<Long, KeyOffsets> data)
+        DynamicLeaf(SortedMap<Long, LongSet> data)
         {
             super(data.firstKey(), data.lastKey());
             tokens = data;
@@ -176,7 +181,7 @@ public class DynamicTokenTreeBuilder extends AbstractTokenTreeBuilder
 
         protected void serializeData(ByteBuffer buf)
         {
-            for (Map.Entry<Long, KeyOffsets> entry : tokens.entrySet())
+            for (Map.Entry<Long, LongSet> entry : tokens.entrySet())
                 createEntry(entry.getKey(), entry.getValue()).serialize(buf);
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/disk/KeyOffsets.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/KeyOffsets.java b/src/java/org/apache/cassandra/index/sasi/disk/KeyOffsets.java
deleted file mode 100644
index db849fe..0000000
--- a/src/java/org/apache/cassandra/index/sasi/disk/KeyOffsets.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.index.sasi.disk;
-
-import java.util.*;
-
-import org.apache.commons.lang3.ArrayUtils;
-
-import com.carrotsearch.hppc.LongObjectOpenHashMap;
-import com.carrotsearch.hppc.cursors.LongObjectCursor;
-
-public class KeyOffsets extends LongObjectOpenHashMap<long[]>
-{
-    public static final long NO_OFFSET = Long.MIN_VALUE;
-
-    public KeyOffsets() {
-        super(4);
-    }
-
-    public KeyOffsets(int initialCapacity) {
-        super(initialCapacity);
-    }
-
-    public void put(long currentPartitionOffset, long currentRowOffset)
-    {
-        if (containsKey(currentPartitionOffset))
-            super.put(currentPartitionOffset, append(get(currentPartitionOffset), currentRowOffset));
-        else
-            super.put(currentPartitionOffset, asArray(currentRowOffset));
-    }
-
-    public long[] put(long currentPartitionOffset, long[] currentRowOffset)
-    {
-        if (containsKey(currentPartitionOffset))
-            return super.put(currentPartitionOffset, merge(get(currentPartitionOffset), currentRowOffset));
-        else
-            return super.put(currentPartitionOffset, currentRowOffset);
-    }
-
-    public boolean equals(Object obj)
-    {
-        if (!(obj instanceof KeyOffsets))
-            return false;
-
-        KeyOffsets other = (KeyOffsets) obj;
-        if (other.size() != this.size())
-            return false;
-
-        for (LongObjectCursor<long[]> cursor : this)
-            if (!Arrays.equals(cursor.value, other.get(cursor.key)))
-                return false;
-
-        return true;
-    }
-
-    @Override
-    public String toString()
-    {
-        StringBuilder sb = new StringBuilder("KeyOffsets { ");
-        forEach((a, b) -> {
-            sb.append(a).append(": ").append(Arrays.toString(b));
-        });
-        sb.append(" }");
-        return sb.toString();
-    }
-
-    // primitive array creation
-    public static long[] asArray(long... vals)
-    {
-        return vals;
-    }
-
-    private static long[] merge(long[] arr1, long[] arr2)
-    {
-        long[] copy = new long[arr2.length];
-        int written = 0;
-        for (long l : arr2)
-        {
-            if (!ArrayUtils.contains(arr1, l))
-                copy[written++] = l;
-        }
-
-        if (written == 0)
-            return arr1;
-
-        long[] merged = new long[arr1.length + written];
-        System.arraycopy(arr1, 0, merged, 0, arr1.length);
-        System.arraycopy(copy, 0, merged, arr1.length, written);
-        return merged;
-    }
-
-    private static long[] append(long[] arr1, long v)
-    {
-        if (ArrayUtils.contains(arr1, v))
-            return arr1;
-        else
-            return ArrayUtils.add(arr1, v);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndex.java b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndex.java
index 70d24a7..4d43cd9 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndex.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndex.java
@@ -22,7 +22,8 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.stream.Collectors;
 
-import org.apache.cassandra.index.sasi.*;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.index.sasi.Term;
 import org.apache.cassandra.index.sasi.plan.Expression;
 import org.apache.cassandra.index.sasi.plan.Expression.Op;
 import org.apache.cassandra.index.sasi.utils.MappedBuffer;
@@ -36,12 +37,12 @@ import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 
+import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.PeekingIterator;
 
 import static org.apache.cassandra.index.sasi.disk.OnDiskBlock.SearchResult;
-import static org.apache.cassandra.index.sasi.disk.TokenTreeBuilder.TOKEN_BYTES;
 
 public class OnDiskIndex implements Iterable<OnDiskIndex.DataTerm>, Closeable
 {
@@ -105,7 +106,7 @@ public class OnDiskIndex implements Iterable<OnDiskIndex.DataTerm>, Closeable
     protected final long indexSize;
     protected final boolean hasMarkedPartials;
 
-    protected final KeyFetcher keyFetcher;
+    protected final Function<Long, DecoratedKey> keyFetcher;
 
     protected final String indexPath;
 
@@ -115,7 +116,7 @@ public class OnDiskIndex implements Iterable<OnDiskIndex.DataTerm>, Closeable
     protected final ByteBuffer minTerm, maxTerm, minKey, maxKey;
 
     @SuppressWarnings("resource")
-    public OnDiskIndex(File index, AbstractType<?> cmp, KeyFetcher keyReader)
+    public OnDiskIndex(File index, AbstractType<?> cmp, Function<Long, DecoratedKey> keyReader)
     {
         keyFetcher = keyReader;
 
@@ -634,7 +635,6 @@ public class OnDiskIndex implements Iterable<OnDiskIndex.DataTerm>, Closeable
         {
             final long blockEnd = FBUtilities.align(content.position(), OnDiskIndexBuilder.BLOCK_SIZE);
 
-            // ([int] -1 for sparse, offset for non-sparse)
             if (isSparse())
                 return new PrefetchedTokensIterator(getSparseTokens());
 
@@ -658,7 +658,7 @@ public class OnDiskIndex implements Iterable<OnDiskIndex.DataTerm>, Closeable
             NavigableMap<Long, Token> individualTokens = new TreeMap<>();
             for (int i = 0; i < size; i++)
             {
-                Token token = perBlockIndex.get(content.getLong(ptrOffset + 1 + TOKEN_BYTES * i), keyFetcher);
+                Token token = perBlockIndex.get(content.getLong(ptrOffset + 1 + (8 * i)), keyFetcher);
 
                 assert token != null;
                 individualTokens.put(token.get(), token);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java
index b6e2da5..4946f06 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java
@@ -23,7 +23,6 @@ import java.nio.ByteBuffer;
 import java.util.*;
 
 import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.dht.*;
 import org.apache.cassandra.index.sasi.plan.Expression.Op;
 import org.apache.cassandra.index.sasi.sa.IndexedTerm;
 import org.apache.cassandra.index.sasi.sa.IntegralSA;
@@ -38,6 +37,7 @@ import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 
 import com.carrotsearch.hppc.LongArrayList;
+import com.carrotsearch.hppc.LongSet;
 import com.carrotsearch.hppc.ShortArrayList;
 import com.google.common.annotations.VisibleForTesting;
 
@@ -163,7 +163,7 @@ public class OnDiskIndexBuilder
         this.marksPartials = marksPartials;
     }
 
-    public OnDiskIndexBuilder add(ByteBuffer term, DecoratedKey key, long partitionOffset, long rowOffset)
+    public OnDiskIndexBuilder add(ByteBuffer term, DecoratedKey key, long keyPosition)
     {
         if (term.remaining() >= MAX_TERM_SIZE)
         {
@@ -183,16 +183,16 @@ public class OnDiskIndexBuilder
             estimatedBytes += 64 + 48 + term.remaining();
         }
 
-        tokens.add((Long) key.getToken().getTokenValue(), partitionOffset, rowOffset);
+        tokens.add((Long) key.getToken().getTokenValue(), keyPosition);
 
         // calculate key range (based on actual key values) for current index
         minKey = (minKey == null || keyComparator.compare(minKey, key.getKey()) > 0) ? key.getKey() : minKey;
         maxKey = (maxKey == null || keyComparator.compare(maxKey, key.getKey()) < 0) ? key.getKey() : maxKey;
 
-        // 84 ((boolean(1)*4) + (long(8)*4) + 24 + 24) bytes for the LongObjectOpenHashMap<long[]> created
-        // when the keyPosition was added + 40 bytes for the TreeMap.Entry + 8 bytes for the token (key).
+        // 60 ((boolean(1)*4) + (long(8)*4) + 24) bytes for the LongOpenHashSet created when the keyPosition was added
+        // + 40 bytes for the TreeMap.Entry + 8 bytes for the token (key).
         // in the case of hash collision for the token we may overestimate but this is extremely rare
-        estimatedBytes += 84 + 40 + 8;
+        estimatedBytes += 60 + 40 + 8;
 
         return this;
     }
@@ -569,7 +569,7 @@ public class OnDiskIndexBuilder
         }
     }
 
-    private class MutableDataBlock extends MutableBlock<InMemoryDataTerm>
+    private static class MutableDataBlock extends MutableBlock<InMemoryDataTerm>
     {
         private static final int MAX_KEYS_SPARSE = 5;
 
@@ -651,7 +651,7 @@ public class OnDiskIndexBuilder
         {
             term.serialize(buffer);
             buffer.writeByte((byte) keys.getTokenCount());
-            for (Pair<Long, KeyOffsets> key : keys)
+            for (Pair<Long, LongSet> key : keys)
                 buffer.writeLong(key.left);
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java b/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java
index c204883..9fa4e87 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java
@@ -109,7 +109,7 @@ public class PerSSTableIndexWriter implements SSTableFlushObserver
         currentKeyPosition = curPosition;
     }
 
-    public void nextUnfilteredCluster(Unfiltered unfiltered, long currentRowOffset)
+    public void nextUnfilteredCluster(Unfiltered unfiltered)
     {
         if (!unfiltered.isRow())
             return;
@@ -129,15 +129,10 @@ public class PerSSTableIndexWriter implements SSTableFlushObserver
             if (index == null)
                 indexes.put(column, (index = newIndex(columnIndex)));
 
-            index.add(value.duplicate(), currentKey, currentKeyPosition, currentRowOffset);
+            index.add(value.duplicate(), currentKey, currentKeyPosition);
         });
     }
 
-    public void nextUnfilteredCluster(Unfiltered unfilteredCluster)
-    {
-        throw new UnsupportedOperationException("SASI Index does not support direct row access.");
-    }
-
     public void complete()
     {
         if (isComplete)
@@ -202,7 +197,7 @@ public class PerSSTableIndexWriter implements SSTableFlushObserver
             this.currentBuilder = newIndexBuilder();
         }
 
-        public void add(ByteBuffer term, DecoratedKey key, long partitoinOffset, long rowOffset)
+        public void add(ByteBuffer term, DecoratedKey key, long keyPosition)
         {
             if (term.remaining() == 0)
                 return;
@@ -240,7 +235,7 @@ public class PerSSTableIndexWriter implements SSTableFlushObserver
                     }
                 }
 
-                currentBuilder.add(token, key, partitoinOffset, rowOffset);
+                currentBuilder.add(token, key, keyPosition);
                 isAdded = true;
             }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/disk/RowKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/RowKey.java b/src/java/org/apache/cassandra/index/sasi/disk/RowKey.java
deleted file mode 100644
index fc5a2c0..0000000
--- a/src/java/org/apache/cassandra/index/sasi/disk/RowKey.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyclustering ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.index.sasi.disk;
-
-import java.util.*;
-import java.util.stream.*;
-
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.utils.*;
-
-/**
- * Primary key of the found row, a combination of the Partition Key
- * and clustering that belongs to the row.
- */
-public class RowKey implements Comparable<RowKey>
-{
-
-    public final DecoratedKey decoratedKey;
-    public final Clustering clustering;
-
-    private final ClusteringComparator comparator;
-
-    public RowKey(DecoratedKey primaryKey, Clustering clustering, ClusteringComparator comparator)
-    {
-        this.decoratedKey = primaryKey;
-        this.clustering = clustering;
-        this.comparator = comparator;
-    }
-
-    public boolean equals(Object o)
-    {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-
-        RowKey rowKey = (RowKey) o;
-
-        if (decoratedKey != null ? !decoratedKey.equals(rowKey.decoratedKey) : rowKey.decoratedKey != null)
-            return false;
-        return clustering != null ? clustering.equals(rowKey.clustering) : rowKey.clustering == null;
-    }
-
-    public int hashCode()
-    {
-        return new HashCodeBuilder().append(decoratedKey).append(clustering).toHashCode();
-    }
-
-    public int compareTo(RowKey other)
-    {
-        int cmp = this.decoratedKey.compareTo(other.decoratedKey);
-        if (cmp == 0 && clustering != null)
-        {
-            // Both clustering and rows should match
-            if (clustering.kind() == ClusteringPrefix.Kind.STATIC_CLUSTERING || other.clustering.kind() == ClusteringPrefix.Kind.STATIC_CLUSTERING)
-                return 0;
-
-            return comparator.compare(this.clustering, other.clustering);
-        }
-        else
-        {
-            return cmp;
-        }
-    }
-
-    public static RowKeyComparator COMPARATOR = new RowKeyComparator();
-
-    public String toString(CFMetaData metadata)
-    {
-        return String.format("RowKey: { pk : %s, clustering: %s}",
-                             metadata.getKeyValidator().getString(decoratedKey.getKey()),
-                             clustering.toString(metadata));
-    }
-
-    @Override
-    public String toString()
-    {
-        return String.format("RowKey: { pk : %s, clustering: %s}",
-                             ByteBufferUtil.bytesToHex(decoratedKey.getKey()),
-                             String.join(",", Arrays.stream(clustering.getRawValues()).map(ByteBufferUtil::bytesToHex).collect(Collectors.toList())));
-    }
-
-    private static class RowKeyComparator implements Comparator<RowKey>
-    {
-        public int compare(RowKey o1, RowKey o2)
-        {
-            return o1.compareTo(o2);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/disk/StaticTokenTreeBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/StaticTokenTreeBuilder.java b/src/java/org/apache/cassandra/index/sasi/disk/StaticTokenTreeBuilder.java
index 8a11d60..6e64c56 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/StaticTokenTreeBuilder.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/StaticTokenTreeBuilder.java
@@ -19,7 +19,8 @@ package org.apache.cassandra.index.sasi.disk;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.Iterator;
+import java.util.SortedMap;
 
 import org.apache.cassandra.index.sasi.utils.CombinedTerm;
 import org.apache.cassandra.index.sasi.utils.RangeIterator;
@@ -27,6 +28,7 @@ import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.AbstractIterator;
 import org.apache.cassandra.utils.Pair;
 
+import com.carrotsearch.hppc.LongSet;
 import com.google.common.collect.Iterators;
 
 /**
@@ -61,17 +63,17 @@ public class StaticTokenTreeBuilder extends AbstractTokenTreeBuilder
         combinedTerm = term;
     }
 
-    public void add(Long token, long partitionOffset, long rowOffset)
+    public void add(Long token, long keyPosition)
     {
         throw new UnsupportedOperationException();
     }
 
-    public void add(SortedMap<Long, KeyOffsets> data)
+    public void add(SortedMap<Long, LongSet> data)
     {
         throw new UnsupportedOperationException();
     }
 
-    public void add(Iterator<Pair<Long, KeyOffsets>> data)
+    public void add(Iterator<Pair<Long, LongSet>> data)
     {
         throw new UnsupportedOperationException();
     }
@@ -81,12 +83,12 @@ public class StaticTokenTreeBuilder extends AbstractTokenTreeBuilder
         return tokenCount == 0;
     }
 
-    public Iterator<Pair<Long, KeyOffsets>> iterator()
+    public Iterator<Pair<Long, LongSet>> iterator()
     {
-        @SuppressWarnings("resource") Iterator<Token> iterator = combinedTerm.getTokenIterator();
-        return new AbstractIterator<Pair<Long, KeyOffsets>>()
+        Iterator<Token> iterator = combinedTerm.getTokenIterator();
+        return new AbstractIterator<Pair<Long, LongSet>>()
         {
-            protected Pair<Long, KeyOffsets> computeNext()
+            protected Pair<Long, LongSet> computeNext()
             {
                 if (!iterator.hasNext())
                     return endOfData();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/disk/Token.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/Token.java b/src/java/org/apache/cassandra/index/sasi/disk/Token.java
index 8ea864f..4cd1ea3 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/Token.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/Token.java
@@ -19,9 +19,12 @@ package org.apache.cassandra.index.sasi.disk;
 
 import com.google.common.primitives.Longs;
 
-import org.apache.cassandra.index.sasi.utils.*;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.index.sasi.utils.CombinedValue;
 
-public abstract class Token implements CombinedValue<Long>, Iterable<RowKey>
+import com.carrotsearch.hppc.LongSet;
+
+public abstract class Token implements CombinedValue<Long>, Iterable<DecoratedKey>
 {
     protected final long token;
 
@@ -35,7 +38,7 @@ public abstract class Token implements CombinedValue<Long>, Iterable<RowKey>
         return token;
     }
 
-    public abstract KeyOffsets getOffsets();
+    public abstract LongSet getOffsets();
 
     public int compareTo(CombinedValue<Long> o)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java b/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java
index 1969627..c69ce00 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java
@@ -19,21 +19,22 @@ package org.apache.cassandra.index.sasi.disk;
 
 import java.io.IOException;
 import java.util.*;
-import java.util.stream.*;
 
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.index.sasi.utils.AbstractIterator;
+import org.apache.cassandra.index.sasi.utils.CombinedValue;
+import org.apache.cassandra.index.sasi.utils.MappedBuffer;
+import org.apache.cassandra.index.sasi.utils.RangeIterator;
+import org.apache.cassandra.utils.MergeIterator;
+
+import com.carrotsearch.hppc.LongOpenHashSet;
+import com.carrotsearch.hppc.LongSet;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
 import com.google.common.collect.Iterators;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 
-import com.carrotsearch.hppc.cursors.LongObjectCursor;
-import org.apache.cassandra.index.sasi.*;
-import org.apache.cassandra.index.sasi.disk.Descriptor.*;
-import org.apache.cassandra.index.sasi.utils.AbstractIterator;
-import org.apache.cassandra.index.sasi.utils.*;
-import org.apache.cassandra.utils.*;
-
-import static org.apache.cassandra.index.sasi.disk.Descriptor.Version.*;
-import static org.apache.cassandra.index.sasi.disk.TokenTreeBuilder.*;
+import static org.apache.cassandra.index.sasi.disk.TokenTreeBuilder.EntryType;
 
 // Note: all of the seek-able offsets contained in TokenTree should be sizeof(long)
 // even if currently only lower int portion of them if used, because that makes
@@ -41,6 +42,9 @@ import static org.apache.cassandra.index.sasi.disk.TokenTreeBuilder.*;
 // without any on-disk format changes and/or re-indexing if one day we'll have a need to.
 public class TokenTree
 {
+    private static final int LONG_BYTES = Long.SIZE / 8;
+    private static final int SHORT_BYTES = Short.SIZE / 8;
+
     private final Descriptor descriptor;
     private final MappedBuffer file;
     private final long startPos;
@@ -62,7 +66,8 @@ public class TokenTree
 
         file.position(startPos + TokenTreeBuilder.SHARED_HEADER_BYTES);
 
-        validateMagic();
+        if (!validateMagic())
+            throw new IllegalArgumentException("invalid token tree");
 
         tokenCount = file.getLong();
         treeMinToken = file.getLong();
@@ -74,12 +79,12 @@ public class TokenTree
         return tokenCount;
     }
 
-    public RangeIterator<Long, Token> iterator(KeyFetcher keyFetcher)
+    public RangeIterator<Long, Token> iterator(Function<Long, DecoratedKey> keyFetcher)
     {
         return new TokenTreeIterator(file.duplicate(), keyFetcher);
     }
 
-    public OnDiskToken get(final long searchToken, KeyFetcher keyFetcher)
+    public OnDiskToken get(final long searchToken, Function<Long, DecoratedKey> keyFetcher)
     {
         seekToLeaf(searchToken, file);
         long leafStart = file.position();
@@ -90,24 +95,21 @@ public class TokenTree
 
         file.position(leafStart + TokenTreeBuilder.BLOCK_HEADER_BYTES);
 
-        OnDiskToken token = getTokenAt(file, tokenIndex, leafSize, keyFetcher);
-
+        OnDiskToken token = OnDiskToken.getTokenAt(file, tokenIndex, leafSize, keyFetcher);
         return token.get().equals(searchToken) ? token : null;
     }
 
-    private void validateMagic()
+    private boolean validateMagic()
     {
-        if (descriptor.version == aa)
-            return;
-
-        short magic = file.getShort();
-        if (descriptor.version == Version.ab && magic == TokenTreeBuilder.AB_MAGIC)
-            return;
-
-        if (descriptor.version == Version.ac && magic == TokenTreeBuilder.AC_MAGIC)
-            return;
-
-        throw new IllegalArgumentException("invalid token tree. Written magic: '" + ByteBufferUtil.bytesToHex(ByteBufferUtil.bytes(magic)) + "'");
+        switch (descriptor.version.toString())
+        {
+            case Descriptor.VERSION_AA:
+                return true;
+            case Descriptor.VERSION_AB:
+                return TokenTreeBuilder.AB_MAGIC == file.getShort();
+            default:
+                return false;
+        }
     }
 
     // finds leaf that *could* contain token
@@ -134,16 +136,18 @@ public class TokenTree
             long minToken = file.getLong();
             long maxToken = file.getLong();
 
-            long seekBase = blockStart + BLOCK_HEADER_BYTES;
+            long seekBase = blockStart + TokenTreeBuilder.BLOCK_HEADER_BYTES;
             if (minToken > token)
             {
                 // seek to beginning of child offsets to locate first child
-                file.position(seekBase + tokenCount * TOKEN_BYTES);
+                file.position(seekBase + tokenCount * LONG_BYTES);
+                blockStart = (startPos + (int) file.getLong());
             }
             else if (maxToken < token)
             {
                 // seek to end of child offsets to locate last child
-                file.position(seekBase + (2 * tokenCount) * TOKEN_BYTES);
+                file.position(seekBase + (2 * tokenCount) * LONG_BYTES);
+                blockStart = (startPos + (int) file.getLong());
             }
             else
             {
@@ -154,11 +158,12 @@ public class TokenTree
 
                 // file pointer is now at beginning of offsets
                 if (offsetIndex == tokenCount)
-                    file.position(file.position() + (offsetIndex * TOKEN_BYTES));
+                    file.position(file.position() + (offsetIndex * LONG_BYTES));
                 else
-                    file.position(file.position() + ((tokenCount - offsetIndex - 1) + offsetIndex) * TOKEN_BYTES);
+                    file.position(file.position() + ((tokenCount - offsetIndex - 1) + offsetIndex) * LONG_BYTES);
+
+                blockStart = (startPos + (int) file.getLong());
             }
-            blockStart = (startPos + (int) file.getLong());
         }
     }
 
@@ -167,7 +172,8 @@ public class TokenTree
         short offsetIndex = 0;
         for (int i = 0; i < tokenCount; i++)
         {
-            if (searchToken < file.getLong())
+            long readToken = file.getLong();
+            if (searchToken < readToken)
                 break;
 
             offsetIndex++;
@@ -187,7 +193,10 @@ public class TokenTree
         while (start <= end)
         {
             middle = start + ((end - start) >> 1);
-            long token = file.getLong(base + middle * LEAF_ENTRY_BYTES + (descriptor.version.compareTo(Version.ac) < 0 ? LEGACY_TOKEN_OFFSET_BYTES : TOKEN_OFFSET_BYTES));
+
+            // each entry is 16 bytes wide, token is in bytes 4-11
+            long token = file.getLong(base + (middle * (2 * LONG_BYTES) + 4));
+
             if (token == searchToken)
                 break;
 
@@ -200,9 +209,9 @@ public class TokenTree
         return (short) middle;
     }
 
-    private class TokenTreeIterator extends RangeIterator<Long, Token>
+    public class TokenTreeIterator extends RangeIterator<Long, Token>
     {
-        private final KeyFetcher keyFetcher;
+        private final Function<Long, DecoratedKey> keyFetcher;
         private final MappedBuffer file;
 
         private long currentLeafStart;
@@ -215,7 +224,7 @@ public class TokenTree
         protected boolean firstIteration = true;
         private boolean lastLeaf;
 
-        TokenTreeIterator(MappedBuffer file, KeyFetcher keyFetcher)
+        TokenTreeIterator(MappedBuffer file, Function<Long, DecoratedKey> keyFetcher)
         {
             super(treeMinToken, treeMaxToken, tokenCount);
 
@@ -305,13 +314,13 @@ public class TokenTree
 
         private Token getTokenAt(int idx)
         {
-            return TokenTree.this.getTokenAt(file, idx, leafSize, keyFetcher);
+            return OnDiskToken.getTokenAt(file, idx, leafSize, keyFetcher);
         }
 
         private long getTokenPosition(int idx)
         {
-            // skip entry header to get position pointing directly at the entry's token
-            return TokenTree.this.getEntryPosition(idx, file, descriptor) + (descriptor.version.compareTo(Version.ac) < 0 ? LEGACY_TOKEN_OFFSET_BYTES : TOKEN_OFFSET_BYTES);
+            // skip 4 byte entry header to get position pointing directly at the entry's token
+            return OnDiskToken.getEntryPosition(idx, file) + (2 * SHORT_BYTES);
         }
 
         private void seekToNextLeaf()
@@ -338,15 +347,15 @@ public class TokenTree
         }
     }
 
-    public class OnDiskToken extends Token
+    public static class OnDiskToken extends Token
     {
         private final Set<TokenInfo> info = new HashSet<>(2);
-        private final Set<RowKey> loadedKeys = new TreeSet<>(RowKey.COMPARATOR);
+        private final Set<DecoratedKey> loadedKeys = new TreeSet<>(DecoratedKey.comparator);
 
-        private OnDiskToken(MappedBuffer buffer, long position, short leafSize, KeyFetcher keyFetcher)
+        public OnDiskToken(MappedBuffer buffer, long position, short leafSize, Function<Long, DecoratedKey> keyFetcher)
         {
-            super(buffer.getLong(position + (descriptor.version.compareTo(Version.ac) < 0 ? LEGACY_TOKEN_OFFSET_BYTES : TOKEN_OFFSET_BYTES)));
-            info.add(new TokenInfo(buffer, position, leafSize, keyFetcher, descriptor));
+            super(buffer.getLong(position + (2 * SHORT_BYTES)));
+            info.add(new TokenInfo(buffer, position, leafSize, keyFetcher));
         }
 
         public void merge(CombinedValue<Long> other)
@@ -368,9 +377,9 @@ public class TokenTree
             }
         }
 
-        public Iterator<RowKey> iterator()
+        public Iterator<DecoratedKey> iterator()
         {
-            List<Iterator<RowKey>> keys = new ArrayList<>(info.size());
+            List<Iterator<DecoratedKey>> keys = new ArrayList<>(info.size());
 
             for (TokenInfo i : info)
                 keys.add(i.iterator());
@@ -378,72 +387,68 @@ public class TokenTree
             if (!loadedKeys.isEmpty())
                 keys.add(loadedKeys.iterator());
 
-            return MergeIterator.get(keys, RowKey.COMPARATOR, new MergeIterator.Reducer<RowKey, RowKey>()
+            return MergeIterator.get(keys, DecoratedKey.comparator, new MergeIterator.Reducer<DecoratedKey, DecoratedKey>()
             {
-                RowKey reduced = null;
+                DecoratedKey reduced = null;
 
                 public boolean trivialReduceIsTrivial()
                 {
                     return true;
                 }
 
-                public void reduce(int idx, RowKey current)
+                public void reduce(int idx, DecoratedKey current)
                 {
                     reduced = current;
                 }
 
-                protected RowKey getReduced()
+                protected DecoratedKey getReduced()
                 {
                     return reduced;
                 }
             });
         }
 
-        public KeyOffsets getOffsets()
+        public LongSet getOffsets()
         {
-            KeyOffsets offsets = new KeyOffsets();
+            LongSet offsets = new LongOpenHashSet(4);
             for (TokenInfo i : info)
             {
-                for (LongObjectCursor<long[]> offset : i.fetchOffsets())
-                    offsets.put(offset.key, offset.value);
+                for (long offset : i.fetchOffsets())
+                    offsets.add(offset);
             }
 
             return offsets;
         }
-    }
 
-    private OnDiskToken getTokenAt(MappedBuffer buffer, int idx, short leafSize, KeyFetcher keyFetcher)
-    {
-        return new OnDiskToken(buffer, getEntryPosition(idx, buffer, descriptor), leafSize, keyFetcher);
-    }
-
-    private long getEntryPosition(int idx, MappedBuffer file, Descriptor descriptor)
-    {
-        if (descriptor.version.compareTo(Version.ac) < 0)
-            return file.position() + (idx * LEGACY_LEAF_ENTRY_BYTES);
+        public static OnDiskToken getTokenAt(MappedBuffer buffer, int idx, short leafSize, Function<Long, DecoratedKey> keyFetcher)
+        {
+            return new OnDiskToken(buffer, getEntryPosition(idx, buffer), leafSize, keyFetcher);
+        }
 
-        // skip n entries, to the entry with the given index
-        return file.position() + (idx * LEAF_ENTRY_BYTES);
+        private static long getEntryPosition(int idx, MappedBuffer file)
+        {
+            // info (4 bytes) + token (8 bytes) + offset (4 bytes) = 16 bytes
+            return file.position() + (idx * (2 * LONG_BYTES));
+        }
     }
 
     private static class TokenInfo
     {
         private final MappedBuffer buffer;
-        private final KeyFetcher keyFetcher;
-        private final Descriptor descriptor;
+        private final Function<Long, DecoratedKey> keyFetcher;
+
         private final long position;
         private final short leafSize;
 
-        public TokenInfo(MappedBuffer buffer, long position, short leafSize, KeyFetcher keyFetcher, Descriptor descriptor)
+        public TokenInfo(MappedBuffer buffer, long position, short leafSize, Function<Long, DecoratedKey> keyFetcher)
         {
             this.keyFetcher = keyFetcher;
             this.buffer = buffer;
             this.position = position;
             this.leafSize = leafSize;
-            this.descriptor = descriptor;
         }
 
-        public Iterator<RowKey> iterator()
+        public Iterator<DecoratedKey> iterator()
         {
             return new KeyIterator(keyFetcher, fetchOffsets());
         }
@@ -460,154 +465,59 @@ public class TokenTree
 
             TokenInfo o = (TokenInfo) other;
             return keyFetcher == o.keyFetcher && position == o.position;
-
         }
 
-        /**
-         * Legacy leaf storage format (used for reading data formats before AC):
-         *
-         *    [(short) leaf type][(short) offset extra bytes][(long) token][(int) offsetData]
-         *
-         * Many pairs can be encoded into long+int.
-         *
-         * Simple entry: offset fits into (int)
-         *
-         *    [(short) leaf type][(short) offset extra bytes][(long) token][(int) offsetData]
-         *
-         * FactoredOffset: a single offset, offset fits into (long)+(int) bits:
-         *
-         *    [(short) leaf type][(short) 16 bytes of remained offset][(long) token][(int) top 32 bits of offset]
-         *
-         * PackedCollisionEntry: packs the two offset entries into int and a short (if both of them fit into
-         * (long) and one of them fits into (int))
-         *
-         *    [(short) leaf type][(short) 16 the offset that'd fit into short][(long) token][(int) 32 bits of offset that'd fit into int]
-         *
-         * Otherwise, the rest gets packed into limited-size overflow collision entry
-         *
-         *    [(short) leaf type][(short) count][(long) token][(int) start index]
-         */
-        private KeyOffsets fetchOffsetsLegacy()
+        private long[] fetchOffsets()
         {
             short info = buffer.getShort(position);
             // offset extra is unsigned short (right-most 16 bits of 48 bits allowed for an offset)
-            int offsetExtra = buffer.getShort(position + Short.BYTES) & 0xFFFF;
+            int offsetExtra = buffer.getShort(position + SHORT_BYTES) & 0xFFFF;
             // is the it left-most (32-bit) base of the actual offset in the index file
-            int offsetData = buffer.getInt(position + (2 * Short.BYTES) + Long.BYTES);
+            int offsetData = buffer.getInt(position + (2 * SHORT_BYTES) + LONG_BYTES);
 
             EntryType type = EntryType.of(info & TokenTreeBuilder.ENTRY_TYPE_MASK);
 
-            KeyOffsets rowOffsets = new KeyOffsets();
             switch (type)
             {
                 case SIMPLE:
-                    rowOffsets.put(offsetData, KeyOffsets.NO_OFFSET);
-                    break;
+                    return new long[] { offsetData };
+
                 case OVERFLOW:
-                    long offsetPos = (buffer.position() + (2 * (leafSize * Long.BYTES)) + (offsetData * Long.BYTES));
+                    long[] offsets = new long[offsetExtra]; // offsetShort contains count of tokens
+                    long offsetPos = (buffer.position() + (2 * (leafSize * LONG_BYTES)) + (offsetData * LONG_BYTES));
 
                     for (int i = 0; i < offsetExtra; i++)
-                    {
-                        long offset = buffer.getLong(offsetPos + (i * Long.BYTES));;
-                        rowOffsets.put(offset, KeyOffsets.NO_OFFSET);
-                    }
-                    break;
-                case FACTORED:
-                    long offset = (((long) offsetData) << Short.SIZE) + offsetExtra;
-                    rowOffsets.put(offset, KeyOffsets.NO_OFFSET);
-                    break;
-                case PACKED:
-                    rowOffsets.put(offsetExtra, KeyOffsets.NO_OFFSET);
-                    rowOffsets.put(offsetData, KeyOffsets.NO_OFFSET);
-                default:
-                    throw new IllegalStateException("Unknown entry type: " + type);
-            }
-            return rowOffsets;
-        }
+                        offsets[i] = buffer.getLong(offsetPos + (i * LONG_BYTES));
 
-        private KeyOffsets fetchOffsets()
-        {
-            if (descriptor.version.compareTo(Version.ac) < 0)
-                return fetchOffsetsLegacy();
-
-            short info = buffer.getShort(position);
-            EntryType type = EntryType.of(info & TokenTreeBuilder.ENTRY_TYPE_MASK);
+                    return offsets;
 
-            KeyOffsets rowOffsets = new KeyOffsets();
-            long baseOffset = position + LEAF_ENTRY_TYPE_BYTES + TOKEN_BYTES;
-            switch (type)
-            {
-                case SIMPLE:
-                    long partitionOffset = buffer.getLong(baseOffset);
-                    long rowOffset = buffer.getLong(baseOffset + LEAF_PARTITON_OFFSET_BYTES);
+                case FACTORED:
+                    return new long[] { (((long) offsetData) << Short.SIZE) + offsetExtra };
 
-                    rowOffsets.put(partitionOffset, rowOffset);
-                    break;
                 case PACKED:
-                    long partitionOffset1 = buffer.getInt(baseOffset);
-                    long rowOffset1 = buffer.getInt(baseOffset + LEAF_PARTITON_OFFSET_PACKED_BYTES);
-
-                    long partitionOffset2 = buffer.getInt(baseOffset + LEAF_PARTITON_OFFSET_PACKED_BYTES + LEAF_ROW_OFFSET_PACKED_BYTES);
-                    long rowOffset2 = buffer.getInt(baseOffset + 2 * LEAF_PARTITON_OFFSET_PACKED_BYTES + LEAF_ROW_OFFSET_PACKED_BYTES);
+                    return new long[] { offsetExtra, offsetData };
 
-                    rowOffsets.put(partitionOffset1, rowOffset1);
-                    rowOffsets.put(partitionOffset2, rowOffset2);
-                    break;
-                case OVERFLOW:
-                    long collisionOffset = buffer.getLong(baseOffset);
-                    long count = buffer.getLong(baseOffset + LEAF_PARTITON_OFFSET_BYTES);
-
-                    // Skip leaves and collision offsets that do not belong to current token
-                    long offsetPos = buffer.position() + leafSize *  LEAF_ENTRY_BYTES + collisionOffset * COLLISION_ENTRY_BYTES;
-
-                    for (int i = 0; i < count; i++)
-                    {
-                        long currentPartitionOffset = buffer.getLong(offsetPos + i * COLLISION_ENTRY_BYTES);
-                        long currentRowOffset = buffer.getLong(offsetPos + i * COLLISION_ENTRY_BYTES + LEAF_PARTITON_OFFSET_BYTES);
-
-                        rowOffsets.put(currentPartitionOffset, currentRowOffset);
-                    }
-                    break;
                 default:
                     throw new IllegalStateException("Unknown entry type: " + type);
             }
-
-
-            return rowOffsets;
         }
     }
 
-    private static class KeyIterator extends AbstractIterator<RowKey>
+    private static class KeyIterator extends AbstractIterator<DecoratedKey>
     {
-        private final KeyFetcher keyFetcher;
-        private final Iterator<LongObjectCursor<long[]>> offsets;
-        private long currentPatitionKey;
-        private PrimitiveIterator.OfLong currentCursor = null;
+        private final Function<Long, DecoratedKey> keyFetcher;
+        private final long[] offsets;
+        private int index = 0;
 
-        public KeyIterator(KeyFetcher keyFetcher, KeyOffsets offsets)
+        public KeyIterator(Function<Long, DecoratedKey> keyFetcher, long[] offsets)
         {
             this.keyFetcher = keyFetcher;
-            this.offsets = offsets.iterator();
+            this.offsets = offsets;
         }
 
-        public RowKey computeNext()
+        public DecoratedKey computeNext()
         {
-            if (currentCursor != null && currentCursor.hasNext())
-            {
-                return keyFetcher.getRowKey(currentPatitionKey, currentCursor.nextLong());
-            }
-            else if (offsets.hasNext())
-            {
-                LongObjectCursor<long[]> cursor = offsets.next();
-                currentPatitionKey = cursor.key;
-                currentCursor = LongStream.of(cursor.value).iterator();
-
-                return keyFetcher.getRowKey(currentPatitionKey, currentCursor.nextLong());
-            }
-            else
-            {
-                return endOfData();
-            }
+            return index < offsets.length ? keyFetcher.apply(offsets[index++]) : endOfData();
         }
     }
-}
+}
\ No newline at end of file


[2/3] cassandra git commit: Revert "Add row offset support to SASI"

Posted by xe...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/disk/TokenTreeBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/TokenTreeBuilder.java b/src/java/org/apache/cassandra/index/sasi/disk/TokenTreeBuilder.java
index 07804d6..2210964 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/TokenTreeBuilder.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/TokenTreeBuilder.java
@@ -20,62 +20,28 @@ package org.apache.cassandra.index.sasi.disk;
 import java.io.IOException;
 import java.util.*;
 
-import org.apache.cassandra.io.util.*;
-import org.apache.cassandra.utils.*;
-import org.apache.cassandra.utils.obs.BitUtil;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.Pair;
 
-public interface TokenTreeBuilder extends Iterable<Pair<Long, KeyOffsets>>
-{
-    final static int BLOCK_BYTES = 4096;
-
-    final static int LEAF_ENTRY_TYPE_BYTES = Short.BYTES;
-    final static int TOKEN_OFFSET_BYTES = LEAF_ENTRY_TYPE_BYTES;
-    final static int LEAF_PARTITON_OFFSET_BYTES = Long.BYTES;
-    final static int LEAF_ROW_OFFSET_BYTES = Long.BYTES;
-
-    final static int LEAF_PARTITON_OFFSET_PACKED_BYTES = Integer.BYTES;
-    final static int LEAF_ROW_OFFSET_PACKED_BYTES = Integer.BYTES;
-    final static int COLLISION_ENTRY_BYTES = LEAF_PARTITON_OFFSET_BYTES + LEAF_ROW_OFFSET_BYTES;
-
-    final static int HEADER_INFO_BYTE_BYTES = Byte.BYTES;
-    final static int HEADER_TOKEN_COUNT_BYTES = Short.BYTES;
-
-    final static int ROOT_HEADER_MAGIC_SIZE = Short.BYTES;
-    final static int ROOT_HEADER_TOKEN_COUNT_SIZE = Long.BYTES;
+import com.carrotsearch.hppc.LongSet;
 
-    // Partitioner token size in bytes
-    final static int TOKEN_BYTES = Long.BYTES;
-
-    // Leaf entry size in bytes, see {@class SimpleLeafEntry} for a full description
-    final static int LEAF_ENTRY_BYTES = LEAF_ENTRY_TYPE_BYTES + TOKEN_BYTES + LEAF_PARTITON_OFFSET_BYTES + LEAF_ROW_OFFSET_BYTES;
-    // Shared header size in bytes, see {@class AbstractTreeBuilder$Header} for a full description
-    final static int SHARED_HEADER_BYTES = HEADER_INFO_BYTE_BYTES + HEADER_TOKEN_COUNT_BYTES + 2 * TOKEN_BYTES;
-    // Block header size in bytes, see {@class AbstractTreeBuilder$RootHeader}
-    final static int BLOCK_HEADER_BYTES = BitUtil.nextHighestPowerOfTwo(SHARED_HEADER_BYTES + ROOT_HEADER_MAGIC_SIZE + ROOT_HEADER_TOKEN_COUNT_SIZE + 2 * TOKEN_BYTES);
-
-    // Overflow trailer capacity is currently 8 overflow items. Each overflow item consists of two longs.
-    final static int OVERFLOW_TRAILER_CAPACITY = 8;
-    final static int OVERFLOW_TRAILER_BYTES = OVERFLOW_TRAILER_CAPACITY * COLLISION_ENTRY_BYTES;;
-    final static int TOKENS_PER_BLOCK = (TokenTreeBuilder.BLOCK_BYTES - BLOCK_HEADER_BYTES - OVERFLOW_TRAILER_BYTES) / LEAF_ENTRY_BYTES;
-
-    final static int LEGACY_LEAF_ENTRY_BYTES = Short.BYTES + Short.BYTES + TOKEN_BYTES + Integer.BYTES;
-    final static int LEGACY_TOKEN_OFFSET_BYTES = 2 * Short.BYTES;
-    final static byte LAST_LEAF_SHIFT = 1;
-
-    /**
-     * {@code Header} size in bytes.
-     */
-    final byte ENTRY_TYPE_MASK = 0x03;
-    final short AB_MAGIC = 0x5A51;
-    final short AC_MAGIC = 0x7C63;
+public interface TokenTreeBuilder extends Iterable<Pair<Long, LongSet>>
+{
+    int BLOCK_BYTES = 4096;
+    int BLOCK_HEADER_BYTES = 64;
+    int OVERFLOW_TRAILER_BYTES = 64;
+    int OVERFLOW_TRAILER_CAPACITY = OVERFLOW_TRAILER_BYTES / 8;
+    int TOKENS_PER_BLOCK = (BLOCK_BYTES - BLOCK_HEADER_BYTES - OVERFLOW_TRAILER_BYTES) / 16;
+    long MAX_OFFSET = (1L << 47) - 1; // 48 bits for (signed) offset
+    byte LAST_LEAF_SHIFT = 1;
+    byte SHARED_HEADER_BYTES = 19;
+    byte ENTRY_TYPE_MASK = 0x03;
+    short AB_MAGIC = 0x5A51;
 
     // note: ordinal positions are used here, do not change order
     enum EntryType
     {
-        SIMPLE,
-        FACTORED,
-        PACKED,
-        OVERFLOW;
+        SIMPLE, FACTORED, PACKED, OVERFLOW;
 
         public static EntryType of(int ordinal)
         {
@@ -95,9 +61,9 @@ public interface TokenTreeBuilder extends Iterable<Pair<Long, KeyOffsets>>
         }
     }
 
-    void add(Long token, long partitionOffset, long rowOffset);
-    void add(SortedMap<Long, KeyOffsets> data);
-    void add(Iterator<Pair<Long, KeyOffsets>> data);
+    void add(Long token, long keyPosition);
+    void add(SortedMap<Long, LongSet> data);
+    void add(Iterator<Pair<Long, LongSet>> data);
     void add(TokenTreeBuilder ttb);
 
     boolean isEmpty();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/memory/IndexMemtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/memory/IndexMemtable.java b/src/java/org/apache/cassandra/index/sasi/memory/IndexMemtable.java
index a7b22f3..e55a806 100644
--- a/src/java/org/apache/cassandra/index/sasi/memory/IndexMemtable.java
+++ b/src/java/org/apache/cassandra/index/sasi/memory/IndexMemtable.java
@@ -19,14 +19,14 @@ package org.apache.cassandra.index.sasi.memory;
 
 import java.nio.ByteBuffer;
 
+import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.index.sasi.conf.ColumnIndex;
-import org.apache.cassandra.index.sasi.disk.*;
 import org.apache.cassandra.index.sasi.disk.Token;
 import org.apache.cassandra.index.sasi.plan.Expression;
 import org.apache.cassandra.index.sasi.utils.RangeIterator;
 import org.apache.cassandra.index.sasi.utils.TypeUtil;
-import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.FBUtilities;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,7 +42,7 @@ public class IndexMemtable
         this.index = MemIndex.forColumn(columnIndex.keyValidator(), columnIndex);
     }
 
-    public long index(RowKey key, ByteBuffer value)
+    public long index(DecoratedKey key, ByteBuffer value)
     {
         if (value == null || value.remaining() == 0)
             return 0;
@@ -55,7 +55,7 @@ public class IndexMemtable
             {
                 logger.error("Can't add column {} to index for key: {}, value size {}, validator: {}.",
                              index.columnIndex.getColumnName(),
-                             index.columnIndex.keyValidator().getString(key.decoratedKey.getKey()),
+                             index.columnIndex.keyValidator().getString(key.getKey()),
                              FBUtilities.prettyPrintMemory(size),
                              validator);
                 return 0;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/memory/KeyRangeIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/memory/KeyRangeIterator.java b/src/java/org/apache/cassandra/index/sasi/memory/KeyRangeIterator.java
index b4365dc..a2f2c0e 100644
--- a/src/java/org/apache/cassandra/index/sasi/memory/KeyRangeIterator.java
+++ b/src/java/org/apache/cassandra/index/sasi/memory/KeyRangeIterator.java
@@ -18,27 +18,28 @@
 package org.apache.cassandra.index.sasi.memory;
 
 import java.io.IOException;
-import java.util.*;
+import java.util.Iterator;
+import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.concurrent.ConcurrentSkipListSet;
 
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.index.sasi.disk.*;
+import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.index.sasi.disk.Token;
 import org.apache.cassandra.index.sasi.utils.AbstractIterator;
 import org.apache.cassandra.index.sasi.utils.CombinedValue;
 import org.apache.cassandra.index.sasi.utils.RangeIterator;
 
+import com.carrotsearch.hppc.LongOpenHashSet;
+import com.carrotsearch.hppc.LongSet;
 import com.google.common.collect.PeekingIterator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class KeyRangeIterator extends RangeIterator<Long, Token>
 {
     private final DKIterator iterator;
 
-    public KeyRangeIterator(ConcurrentSkipListSet<RowKey> keys)
+    public KeyRangeIterator(ConcurrentSkipListSet<DecoratedKey> keys)
     {
-        super((Long) keys.first().decoratedKey.getToken().getTokenValue(), (Long) keys.last().decoratedKey.getToken().getTokenValue(), keys.size());
+        super((Long) keys.first().getToken().getTokenValue(), (Long) keys.last().getToken().getTokenValue(), keys.size());
         this.iterator = new DKIterator(keys.iterator());
     }
 
@@ -51,8 +52,8 @@ public class KeyRangeIterator extends RangeIterator<Long, Token>
     {
         while (iterator.hasNext())
         {
-            RowKey key = iterator.peek();
-            if (Long.compare((Long) key.decoratedKey.getToken().getTokenValue(), nextToken) >= 0)
+            DecoratedKey key = iterator.peek();
+            if (Long.compare((long) key.getToken().getTokenValue(), nextToken) >= 0)
                 break;
 
             // consume smaller key
@@ -63,16 +64,16 @@ public class KeyRangeIterator extends RangeIterator<Long, Token>
     public void close() throws IOException
     {}
 
-    private static class DKIterator extends AbstractIterator<RowKey> implements PeekingIterator<RowKey>
+    private static class DKIterator extends AbstractIterator<DecoratedKey> implements PeekingIterator<DecoratedKey>
     {
-        private final Iterator<RowKey> keys;
+        private final Iterator<DecoratedKey> keys;
 
-        public DKIterator(Iterator<RowKey> keys)
+        public DKIterator(Iterator<DecoratedKey> keys)
         {
             this.keys = keys;
         }
 
-        protected RowKey computeNext()
+        protected DecoratedKey computeNext()
         {
             return keys.hasNext() ? keys.next() : endOfData();
         }
@@ -80,21 +81,25 @@ public class KeyRangeIterator extends RangeIterator<Long, Token>
 
     private static class DKToken extends Token
     {
-        private final SortedSet<RowKey> keys;
+        private final SortedSet<DecoratedKey> keys;
 
-        public DKToken(RowKey key)
+        public DKToken(final DecoratedKey key)
         {
-            super((Long) key.decoratedKey.getToken().getTokenValue());
+            super((long) key.getToken().getTokenValue());
 
-            keys = new TreeSet<RowKey>(RowKey.COMPARATOR)
+            keys = new TreeSet<DecoratedKey>(DecoratedKey.comparator)
             {{
                 add(key);
             }};
         }
 
-        public KeyOffsets getOffsets()
+        public LongSet getOffsets()
         {
-            throw new IllegalStateException("DecoratedKey tokens are used in memtables and do not have on-disk offsets");
+            LongSet offsets = new LongOpenHashSet(4);
+            for (DecoratedKey key : keys)
+                offsets.add((long) key.getToken().getTokenValue());
+
+            return offsets;
         }
 
         public void merge(CombinedValue<Long> other)
@@ -111,14 +116,14 @@ public class KeyRangeIterator extends RangeIterator<Long, Token>
             }
             else
             {
-                for (RowKey key : o)
+                for (DecoratedKey key : o)
                     keys.add(key);
             }
         }
 
-        public Iterator<RowKey> iterator()
+        public Iterator<DecoratedKey> iterator()
         {
             return keys.iterator();
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/memory/MemIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/memory/MemIndex.java b/src/java/org/apache/cassandra/index/sasi/memory/MemIndex.java
index bfba4cb..cc1eb3f 100644
--- a/src/java/org/apache/cassandra/index/sasi/memory/MemIndex.java
+++ b/src/java/org/apache/cassandra/index/sasi/memory/MemIndex.java
@@ -19,8 +19,8 @@ package org.apache.cassandra.index.sasi.memory;
 
 import java.nio.ByteBuffer;
 
+import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.index.sasi.conf.ColumnIndex;
-import org.apache.cassandra.index.sasi.disk.*;
 import org.apache.cassandra.index.sasi.disk.Token;
 import org.apache.cassandra.index.sasi.plan.Expression;
 import org.apache.cassandra.index.sasi.utils.RangeIterator;
@@ -37,7 +37,7 @@ public abstract class MemIndex
         this.columnIndex = columnIndex;
     }
 
-    public abstract long add(RowKey key, ByteBuffer value);
+    public abstract long add(DecoratedKey key, ByteBuffer value);
     public abstract RangeIterator<Long, Token> search(Expression expression);
 
     public static MemIndex forColumn(AbstractType<?> keyValidator, ColumnIndex columnIndex)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/memory/SkipListMemIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/memory/SkipListMemIndex.java b/src/java/org/apache/cassandra/index/sasi/memory/SkipListMemIndex.java
index 9c3562a..69b57d0 100644
--- a/src/java/org/apache/cassandra/index/sasi/memory/SkipListMemIndex.java
+++ b/src/java/org/apache/cassandra/index/sasi/memory/SkipListMemIndex.java
@@ -22,8 +22,8 @@ import java.util.*;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ConcurrentSkipListSet;
 
+import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.index.sasi.conf.ColumnIndex;
-import org.apache.cassandra.index.sasi.disk.*;
 import org.apache.cassandra.index.sasi.disk.Token;
 import org.apache.cassandra.index.sasi.plan.Expression;
 import org.apache.cassandra.index.sasi.utils.RangeUnionIterator;
@@ -34,7 +34,7 @@ public class SkipListMemIndex extends MemIndex
 {
     public static final int CSLM_OVERHEAD = 128; // average overhead of CSLM
 
-    private final ConcurrentSkipListMap<ByteBuffer, ConcurrentSkipListSet<RowKey>> index;
+    private final ConcurrentSkipListMap<ByteBuffer, ConcurrentSkipListSet<DecoratedKey>> index;
 
     public SkipListMemIndex(AbstractType<?> keyValidator, ColumnIndex columnIndex)
     {
@@ -42,14 +42,14 @@ public class SkipListMemIndex extends MemIndex
         index = new ConcurrentSkipListMap<>(columnIndex.getValidator());
     }
 
-    public long add(RowKey key, ByteBuffer value)
+    public long add(DecoratedKey key, ByteBuffer value)
     {
         long overhead = CSLM_OVERHEAD; // DKs are shared
-        ConcurrentSkipListSet<RowKey> keys = index.get(value);
+        ConcurrentSkipListSet<DecoratedKey> keys = index.get(value);
 
         if (keys == null)
         {
-            ConcurrentSkipListSet<RowKey> newKeys = new ConcurrentSkipListSet<>();
+            ConcurrentSkipListSet<DecoratedKey> newKeys = new ConcurrentSkipListSet<>(DecoratedKey.comparator);
             keys = index.putIfAbsent(value, newKeys);
             if (keys == null)
             {
@@ -68,7 +68,7 @@ public class SkipListMemIndex extends MemIndex
         ByteBuffer min = expression.lower == null ? null : expression.lower.value;
         ByteBuffer max = expression.upper == null ? null : expression.upper.value;
 
-        SortedMap<ByteBuffer, ConcurrentSkipListSet<RowKey>> search;
+        SortedMap<ByteBuffer, ConcurrentSkipListSet<DecoratedKey>> search;
 
         if (min == null && max == null)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/memory/TrieMemIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/memory/TrieMemIndex.java b/src/java/org/apache/cassandra/index/sasi/memory/TrieMemIndex.java
index e1c273d..ca60ac5 100644
--- a/src/java/org/apache/cassandra/index/sasi/memory/TrieMemIndex.java
+++ b/src/java/org/apache/cassandra/index/sasi/memory/TrieMemIndex.java
@@ -23,8 +23,9 @@ import java.util.List;
 import java.util.concurrent.ConcurrentSkipListSet;
 
 import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.index.sasi.conf.ColumnIndex;
-import org.apache.cassandra.index.sasi.disk.*;
+import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder;
 import org.apache.cassandra.index.sasi.disk.Token;
 import org.apache.cassandra.index.sasi.plan.Expression;
 import org.apache.cassandra.index.sasi.plan.Expression.Op;
@@ -37,7 +38,7 @@ import com.googlecode.concurrenttrees.radix.ConcurrentRadixTree;
 import com.googlecode.concurrenttrees.suffix.ConcurrentSuffixTree;
 import com.googlecode.concurrenttrees.radix.node.concrete.SmartArrayBasedNodeFactory;
 import com.googlecode.concurrenttrees.radix.node.Node;
-import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.FBUtilities;
 
 
 import org.slf4j.Logger;
@@ -70,7 +71,7 @@ public class TrieMemIndex extends MemIndex
         }
     }
 
-    public long add(RowKey key, ByteBuffer value)
+    public long add(DecoratedKey key, ByteBuffer value)
     {
         AbstractAnalyzer analyzer = columnIndex.getAnalyzer();
         analyzer.reset(value.duplicate());
@@ -84,7 +85,7 @@ public class TrieMemIndex extends MemIndex
             {
                 logger.info("Can't add term of column {} to index for key: {}, term size {}, max allowed size {}, use analyzed = true (if not yet set) for that column.",
                             columnIndex.getColumnName(),
-                            keyValidator.getString(key.decoratedKey.getKey()),
+                            keyValidator.getString(key.getKey()),
                             FBUtilities.prettyPrintMemory(term.remaining()),
                             FBUtilities.prettyPrintMemory(OnDiskIndexBuilder.MAX_TERM_SIZE));
                 continue;
@@ -112,13 +113,13 @@ public class TrieMemIndex extends MemIndex
             definition = column;
         }
 
-        public long add(String value, RowKey key)
+        public long add(String value, DecoratedKey key)
         {
             long overhead = CSLM_OVERHEAD;
-            ConcurrentSkipListSet<RowKey> keys = get(value);
+            ConcurrentSkipListSet<DecoratedKey> keys = get(value);
             if (keys == null)
             {
-                ConcurrentSkipListSet<RowKey> newKeys = new ConcurrentSkipListSet<>();
+                ConcurrentSkipListSet<DecoratedKey> newKeys = new ConcurrentSkipListSet<>(DecoratedKey.comparator);
                 keys = putIfAbsent(value, newKeys);
                 if (keys == null)
                 {
@@ -140,10 +141,10 @@ public class TrieMemIndex extends MemIndex
         {
             ByteBuffer prefix = expression.lower == null ? null : expression.lower.value;
 
-            Iterable<ConcurrentSkipListSet<RowKey>> search = search(expression.getOp(), definition.cellValueType().getString(prefix));
+            Iterable<ConcurrentSkipListSet<DecoratedKey>> search = search(expression.getOp(), definition.cellValueType().getString(prefix));
 
             RangeUnionIterator.Builder<Long, Token> builder = RangeUnionIterator.builder();
-            for (ConcurrentSkipListSet<RowKey> keys : search)
+            for (ConcurrentSkipListSet<DecoratedKey> keys : search)
             {
                 if (!keys.isEmpty())
                     builder.add(new KeyRangeIterator(keys));
@@ -152,14 +153,14 @@ public class TrieMemIndex extends MemIndex
             return builder.build();
         }
 
-        protected abstract ConcurrentSkipListSet<RowKey> get(String value);
-        protected abstract Iterable<ConcurrentSkipListSet<RowKey>> search(Op operator, String value);
-        protected abstract ConcurrentSkipListSet<RowKey> putIfAbsent(String value, ConcurrentSkipListSet<RowKey> key);
+        protected abstract ConcurrentSkipListSet<DecoratedKey> get(String value);
+        protected abstract Iterable<ConcurrentSkipListSet<DecoratedKey>> search(Op operator, String value);
+        protected abstract ConcurrentSkipListSet<DecoratedKey> putIfAbsent(String value, ConcurrentSkipListSet<DecoratedKey> key);
     }
 
     protected static class ConcurrentPrefixTrie extends ConcurrentTrie
     {
-        private final ConcurrentRadixTree<ConcurrentSkipListSet<RowKey>> trie;
+        private final ConcurrentRadixTree<ConcurrentSkipListSet<DecoratedKey>> trie;
 
         private ConcurrentPrefixTrie(ColumnDefinition column)
         {
@@ -167,23 +168,23 @@ public class TrieMemIndex extends MemIndex
             trie = new ConcurrentRadixTree<>(NODE_FACTORY);
         }
 
-        public ConcurrentSkipListSet<RowKey> get(String value)
+        public ConcurrentSkipListSet<DecoratedKey> get(String value)
         {
             return trie.getValueForExactKey(value);
         }
 
-        public ConcurrentSkipListSet<RowKey> putIfAbsent(String value, ConcurrentSkipListSet<RowKey> newKeys)
+        public ConcurrentSkipListSet<DecoratedKey> putIfAbsent(String value, ConcurrentSkipListSet<DecoratedKey> newKeys)
         {
             return trie.putIfAbsent(value, newKeys);
         }
 
-        public Iterable<ConcurrentSkipListSet<RowKey>> search(Op operator, String value)
+        public Iterable<ConcurrentSkipListSet<DecoratedKey>> search(Op operator, String value)
         {
             switch (operator)
             {
                 case EQ:
                 case MATCH:
-                    ConcurrentSkipListSet<RowKey> keys = trie.getValueForExactKey(value);
+                    ConcurrentSkipListSet<DecoratedKey> keys = trie.getValueForExactKey(value);
                     return keys == null ? Collections.emptyList() : Collections.singletonList(keys);
 
                 case PREFIX:
@@ -197,7 +198,7 @@ public class TrieMemIndex extends MemIndex
 
     protected static class ConcurrentSuffixTrie extends ConcurrentTrie
     {
-        private final ConcurrentSuffixTree<ConcurrentSkipListSet<RowKey>> trie;
+        private final ConcurrentSuffixTree<ConcurrentSkipListSet<DecoratedKey>> trie;
 
         private ConcurrentSuffixTrie(ColumnDefinition column)
         {
@@ -205,23 +206,23 @@ public class TrieMemIndex extends MemIndex
             trie = new ConcurrentSuffixTree<>(NODE_FACTORY);
         }
 
-        public ConcurrentSkipListSet<RowKey> get(String value)
+        public ConcurrentSkipListSet<DecoratedKey> get(String value)
         {
             return trie.getValueForExactKey(value);
         }
 
-        public ConcurrentSkipListSet<RowKey> putIfAbsent(String value, ConcurrentSkipListSet<RowKey> newKeys)
+        public ConcurrentSkipListSet<DecoratedKey> putIfAbsent(String value, ConcurrentSkipListSet<DecoratedKey> newKeys)
         {
             return trie.putIfAbsent(value, newKeys);
         }
 
-        public Iterable<ConcurrentSkipListSet<RowKey>> search(Op operator, String value)
+        public Iterable<ConcurrentSkipListSet<DecoratedKey>> search(Op operator, String value)
         {
             switch (operator)
             {
                 case EQ:
                 case MATCH:
-                    ConcurrentSkipListSet<RowKey> keys = trie.getValueForExactKey(value);
+                    ConcurrentSkipListSet<DecoratedKey> keys = trie.getValueForExactKey(value);
                     return keys == null ? Collections.emptyList() : Collections.singletonList(keys);
 
                 case SUFFIX:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java b/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java
index af4e249..fa1181f 100644
--- a/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java
+++ b/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java
@@ -17,18 +17,15 @@
  */
 package org.apache.cassandra.index.sasi.plan;
 
-import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.Sets;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.RowFilter;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.index.Index;
 import org.apache.cassandra.index.sasi.SASIIndex;
@@ -45,12 +42,10 @@ import org.apache.cassandra.index.sasi.utils.RangeUnionIterator;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.Pair;
 
 public class QueryController
 {
-    private static final Logger logger = LoggerFactory.getLogger(QueryController.class);
-
     private final long executionQuota;
     private final long executionStart;
 
@@ -99,26 +94,22 @@ public class QueryController
         return index.isPresent() ? ((SASIIndex) index.get()).getIndex() : null;
     }
 
-    public UnfilteredRowIterator getPartition(DecoratedKey key, NavigableSet<Clustering> clusterings, ReadExecutionController executionController)
+
+    public UnfilteredRowIterator getPartition(DecoratedKey key, ReadExecutionController executionController)
     {
         if (key == null)
             throw new NullPointerException();
-
         try
         {
-            ClusteringIndexFilter filter;
-            if (clusterings == null)
-                filter = new ClusteringIndexSliceFilter(Slices.ALL, false);
-            else
-                filter = new ClusteringIndexNamesFilter(clusterings, false);
-
-            SinglePartitionReadCommand partition = SinglePartitionReadCommand.create(cfs.metadata,
+            SinglePartitionReadCommand partition = SinglePartitionReadCommand.create(command.isForThrift(),
+                                                                                     cfs.metadata,
                                                                                      command.nowInSec(),
                                                                                      command.columnFilter(),
-                                                                                     command.rowFilter(),
+                                                                                     command.rowFilter().withoutExpressions(),
                                                                                      DataLimits.NONE,
                                                                                      key,
-                                                                                     filter);
+                                                                                     command.clusteringIndexFilter(key));
+
             return partition.queryMemtableAndDisk(cfs, executionController);
         }
         finally
@@ -144,24 +135,20 @@ public class QueryController
 
         RangeIterator.Builder<Long, Token> builder = op == OperationType.OR
                                                 ? RangeUnionIterator.<Long, Token>builder()
-                                                : RangeIntersectionIterator.<Long, org.apache.cassandra.index.sasi.disk.Token>builder();
+                                                : RangeIntersectionIterator.<Long, Token>builder();
 
         List<RangeIterator<Long, Token>> perIndexUnions = new ArrayList<>();
 
         for (Map.Entry<Expression, Set<SSTableIndex>> e : getView(op, expressions).entrySet())
         {
-            try (RangeIterator<Long, Token> index = TermIterator.build(e.getKey(), e.getValue()))
-            {
-                if (index == null)
-                    continue;
+            @SuppressWarnings("resource") // RangeIterators are closed by releaseIndexes
+            RangeIterator<Long, Token> index = TermIterator.build(e.getKey(), e.getValue());
 
-                builder.add(index);
-                perIndexUnions.add(index);
-            }
-            catch (IOException ex)
-            {
-                logger.error("Failed to release index: ", ex);
-            }
+            if (index == null)
+                continue;
+
+            builder.add(index);
+            perIndexUnions.add(index);
         }
 
         resources.put(expressions, perIndexUnions);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java b/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java
index ccb369c..4410756 100644
--- a/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java
+++ b/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java
@@ -19,19 +19,16 @@ package org.apache.cassandra.index.sasi.plan;
 
 import java.util.*;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.config.*;
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.dht.*;
-import org.apache.cassandra.exceptions.*;
-import org.apache.cassandra.index.sasi.disk.*;
+import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.index.sasi.disk.Token;
-import org.apache.cassandra.index.sasi.plan.Operation.*;
-import org.apache.cassandra.utils.btree.*;
+import org.apache.cassandra.index.sasi.plan.Operation.OperationType;
+import org.apache.cassandra.exceptions.RequestTimeoutException;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.AbstractIterator;
 
 public class QueryPlan
 {
@@ -71,16 +68,14 @@ public class QueryPlan
         return new ResultIterator(analyze(), controller, executionController);
     }
 
-    private static class ResultIterator implements UnfilteredPartitionIterator
+    private static class ResultIterator extends AbstractIterator<UnfilteredRowIterator> implements UnfilteredPartitionIterator
     {
         private final AbstractBounds<PartitionPosition> keyRange;
         private final Operation operationTree;
         private final QueryController controller;
         private final ReadExecutionController executionController;
 
-        private Iterator<RowKey> currentKeys = null;
-        private UnfilteredRowIterator nextPartition = null;
-        private DecoratedKey lastPartitionKey = null;
+        private Iterator<DecoratedKey> currentKeys = null;
 
         public ResultIterator(Operation operationTree, QueryController controller, ReadExecutionController executionController)
         {
@@ -92,152 +87,53 @@ public class QueryPlan
                 operationTree.skipTo((Long) keyRange.left.getToken().getTokenValue());
         }
 
-        public boolean hasNext()
-        {
-            return prepareNext();
-        }
-
-        public UnfilteredRowIterator next()
-        {
-            if (nextPartition == null)
-                prepareNext();
-
-            UnfilteredRowIterator toReturn = nextPartition;
-            nextPartition = null;
-            return toReturn;
-        }
-
-        private boolean prepareNext()
+        protected UnfilteredRowIterator computeNext()
         {
             if (operationTree == null)
-                return false;
-
-            if (nextPartition != null)
-                nextPartition.close();
+                return endOfData();
 
             for (;;)
             {
                 if (currentKeys == null || !currentKeys.hasNext())
                 {
                     if (!operationTree.hasNext())
-                        return false;
+                         return endOfData();
 
                     Token token = operationTree.next();
                     currentKeys = token.iterator();
                 }
 
-                CFMetaData metadata = controller.metadata();
-                BTreeSet.Builder<Clustering> clusterings = BTreeSet.builder(metadata.comparator);
-                // results have static clustering, the whole partition has to be read
-                boolean fetchWholePartition = false;
-
-                while (true)
+                while (currentKeys.hasNext())
                 {
-                    if (!currentKeys.hasNext())
-                    {
-                        // No more keys for this token.
-                        // If no clusterings were collected yet, exit this inner loop so the operation
-                        // tree iterator can move on to the next token.
-                        // If some clusterings were collected, build an iterator for those rows
-                        // and return.
-                        if ((clusterings.isEmpty() && !fetchWholePartition) || lastPartitionKey == null)
-                            break;
-
-                        UnfilteredRowIterator partition = fetchPartition(lastPartitionKey, clusterings.build(), fetchWholePartition);
-                        // Prepare for next partition, reset partition key and clusterings
-                        lastPartitionKey = null;
-                        clusterings = BTreeSet.builder(metadata.comparator);
-
-                        if (partition.isEmpty())
-                        {
-                            partition.close();
-                            continue;
-                        }
-
-                        nextPartition = partition;
-                        return true;
-                    }
-
-                    RowKey fullKey = currentKeys.next();
-                    DecoratedKey key = fullKey.decoratedKey;
+                    DecoratedKey key = currentKeys.next();
 
                     if (!keyRange.right.isMinimum() && keyRange.right.compareTo(key) < 0)
-                        return false;
+                        return endOfData();
 
-                    if (lastPartitionKey != null && metadata.getKeyValidator().compare(lastPartitionKey.getKey(), key.getKey()) != 0)
+                    try (UnfilteredRowIterator partition = controller.getPartition(key, executionController))
                     {
-                        UnfilteredRowIterator partition = fetchPartition(lastPartitionKey, clusterings.build(), fetchWholePartition);
+                        Row staticRow = partition.staticRow();
+                        List<Unfiltered> clusters = new ArrayList<>();
 
-                        if (partition.isEmpty())
-                            partition.close();
-                        else
+                        while (partition.hasNext())
                         {
-                            nextPartition = partition;
-                            return true;
+                            Unfiltered row = partition.next();
+                            if (operationTree.satisfiedBy(row, staticRow, true))
+                                clusters.add(row);
                         }
-                    }
-
-                    lastPartitionKey = key;
-
-                    // We fetch whole partition for versions before AC and in case static column index is queried in AC
-                    if (fullKey.clustering == null || fullKey.clustering.clustering().kind() == ClusteringPrefix.Kind.STATIC_CLUSTERING)
-                        fetchWholePartition = true;
-                    else
-                        clusterings.add(fullKey.clustering);
 
+                        if (!clusters.isEmpty())
+                            return new PartitionIterator(partition, clusters);
+                    }
                 }
             }
         }
 
-        private UnfilteredRowIterator fetchPartition(DecoratedKey key, NavigableSet<Clustering> clusterings, boolean fetchWholePartition)
-        {
-            if (fetchWholePartition)
-                clusterings = null;
-
-            try (UnfilteredRowIterator partition = controller.getPartition(key, clusterings, executionController))
-            {
-                Row staticRow = partition.staticRow();
-                List<Unfiltered> clusters = new ArrayList<>();
-
-                while (partition.hasNext())
-                {
-                    Unfiltered row = partition.next();
-                    if (operationTree.satisfiedBy(row, staticRow, true))
-                        clusters.add(row);
-                }
-
-                if (!clusters.isEmpty())
-                    return new PartitionIterator(partition, clusters);
-                else
-                    return UnfilteredRowIterators.noRowsIterator(partition.metadata(),
-                                                                 partition.partitionKey(),
-                                                                 Rows.EMPTY_STATIC_ROW,
-                                                                 partition.partitionLevelDeletion(),
-                                                                 partition.isReverseOrder());
-            }
-        }
-
-        public void close()
-        {
-            if (nextPartition != null)
-                nextPartition.close();
-        }
-
-        public boolean isForThrift()
-        {
-            return controller.isForThrift();
-        }
-
-        public CFMetaData metadata()
-        {
-            return controller.metadata();
-        }
-
         private static class PartitionIterator extends AbstractUnfilteredRowIterator
         {
             private final Iterator<Unfiltered> rows;
 
-            public PartitionIterator(UnfilteredRowIterator partition, Collection<Unfiltered> filteredRows)
+            public PartitionIterator(UnfilteredRowIterator partition, Collection<Unfiltered> content)
             {
                 super(partition.metadata(),
                       partition.partitionKey(),
@@ -247,7 +143,7 @@ public class QueryPlan
                       partition.isReverseOrder(),
                       partition.stats());
 
-                rows = filteredRows.iterator();
+                rows = content.iterator();
             }
 
             @Override
@@ -256,5 +152,21 @@ public class QueryPlan
                 return rows.hasNext() ? rows.next() : endOfData();
             }
         }
+
+        public boolean isForThrift()
+        {
+            return controller.isForThrift();
+        }
+
+        public CFMetaData metadata()
+        {
+            return controller.metadata();
+        }
+
+        public void close()
+        {
+            FileUtils.closeQuietly(operationTree);
+            controller.finish();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/io/sstable/format/SSTableFlushObserver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableFlushObserver.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableFlushObserver.java
index 35898aa..f0b6bac 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableFlushObserver.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableFlushObserver.java
@@ -46,11 +46,6 @@ public interface SSTableFlushObserver
      *
      * @param unfilteredCluster The unfiltered cluster being added to SSTable.
      */
-    default void nextUnfilteredCluster(Unfiltered unfilteredCluster, long position)
-    {
-        nextUnfilteredCluster(unfilteredCluster);
-    }
-
     void nextUnfilteredCluster(Unfiltered unfilteredCluster);
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 40a84dc..56609b3 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -51,7 +51,8 @@ import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.config.SchemaConstants;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ColumnFilter;
-import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.rows.EncodingStats;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
@@ -63,7 +64,6 @@ import org.apache.cassandra.io.sstable.metadata.*;
 import org.apache.cassandra.io.util.*;
 import org.apache.cassandra.metrics.RestorableMeter;
 import org.apache.cassandra.metrics.StorageMetrics;
-import org.apache.cassandra.net.*;
 import org.apache.cassandra.schema.CachingParams;
 import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.service.ActiveRepairService;
@@ -1781,35 +1781,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
     }
 
     /**
-     * Reads Clustering Key from the data file of current sstable.
-     *
-     * @param rowPosition start position of given row in the data file
-     * @return Clustering of the row
-     * @throws IOException
-     */
-    public Clustering clusteringAt(long rowPosition) throws IOException
-    {
-        Clustering clustering;
-        try (FileDataInput in = dfile.createReader(rowPosition))
-        {
-            if (in.isEOF())
-                return null;
-
-            int flags = in.readUnsignedByte();
-            int extendedFlags = UnfilteredSerializer.readExtendedFlags(in, flags);
-            boolean isStatic = UnfilteredSerializer.isStatic(extendedFlags);
-
-            if (isStatic)
-                clustering = Clustering.STATIC_CLUSTERING;
-            else
-                // Since this is an internal call, we don't have to take care of protocol versions that use legacy layout
-                clustering = Clustering.serializer.deserialize(in, MessagingService.VERSION_30, header.clusteringTypes());
-        }
-
-        return clustering;
-    }
-
-    /**
      * TODO: Move someplace reusable
      */
     public abstract static class Operator

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index 19e29a8..c3139a3 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -21,7 +21,9 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -161,9 +163,7 @@ public class BigTableWriter extends SSTableWriter
             return null;
 
         long startPosition = beforeAppend(key);
-        observers.forEach((o) -> {
-            o.startPartition(key, iwriter.indexFile.position());
-        });
+        observers.forEach((o) -> o.startPartition(key, iwriter.indexFile.position()));
 
         //Reuse the writer for each row
         columnIndexWriter.reset();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/utils/obs/BitUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/obs/BitUtil.java b/src/java/org/apache/cassandra/utils/obs/BitUtil.java
index c438d1b..e04de2b 100644
--- a/src/java/org/apache/cassandra/utils/obs/BitUtil.java
+++ b/src/java/org/apache/cassandra/utils/obs/BitUtil.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.utils.obs;
 /**  A variety of high efficiency bit twiddling routines.
  * @lucene.internal
  */
-public final class BitUtil
+final class BitUtil
 {
 
   /** Returns the number of bits set in the long */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/test/data/legacy-sasi/on-disk-sa-int2.db
----------------------------------------------------------------------
diff --git a/test/data/legacy-sasi/on-disk-sa-int2.db b/test/data/legacy-sasi/on-disk-sa-int2.db
deleted file mode 100644
index 71f662f..0000000
Binary files a/test/data/legacy-sasi/on-disk-sa-int2.db and /dev/null differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java b/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java
index fc5afac..0b4e9e2 100644
--- a/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java
@@ -76,7 +76,6 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 
-import com.google.common.collect.Sets;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Uninterruptibles;
 
@@ -93,7 +92,7 @@ public class SASIIndexTest
         PARTITIONER = Murmur3Partitioner.instance;
     }
 
-    private static final String KS_NAME = "sasi_index_test";
+    private static final String KS_NAME = "sasi";
     private static final String CF_NAME = "test_cf";
     private static final String CLUSTERING_CF_NAME_1 = "clustering_test_cf_1";
     private static final String CLUSTERING_CF_NAME_2 = "clustering_test_cf_2";
@@ -449,15 +448,9 @@ public class SASIIndexTest
         if (forceFlush)
             store.forceBlockingFlush();
 
-        CQLTester.assertRowsIgnoringOrder(executeCQL(FTS_CF_NAME, "SELECT * FROM %s.%s WHERE artist LIKE 'lady%%'"),
-                                          CQLTester.row(UUID.fromString("1a4abbcd-b5de-4c69-a578-31231e01ff09"), "Lady Gaga", "Poker Face"),
-                                          CQLTester.row(UUID.fromString("4f8dc18e-54e6-4e16-b507-c5324b61523b"), "Lady Pank", "Zamki na piasku"),
-                                          CQLTester.row(UUID.fromString("eaf294fa-bad5-49d4-8f08-35ba3636a706"), "Lady Pank", "Koncertowa"));
-
-        CQLTester.assertRowsIgnoringOrder(executeCQL(FTS_CF_NAME, "SELECT artist, title FROM %s.%s WHERE artist LIKE 'lady%%'"),
-                                          CQLTester.row("Lady Gaga", "Poker Face"),
-                                          CQLTester.row("Lady Pank", "Zamki na piasku"),
-                                          CQLTester.row("Lady Pank", "Koncertowa"));
+        final UntypedResultSet results = executeCQL(FTS_CF_NAME, "SELECT * FROM %s.%s WHERE artist LIKE 'lady%%'");
+        Assert.assertNotNull(results);
+        Assert.assertEquals(3, results.size());
     }
 
     @Test
@@ -671,7 +664,7 @@ public class SASIIndexTest
                 add("key21");
         }};
 
-        Assert.assertEquals(Sets.newHashSet(expected), Sets.newHashSet(convert(uniqueKeys)));
+        Assert.assertEquals(expected, convert(uniqueKeys));
 
         // now let's test a single equals condition
 
@@ -697,7 +690,7 @@ public class SASIIndexTest
                 add("key21");
         }};
 
-        Assert.assertEquals(Sets.newHashSet(expected), Sets.newHashSet(convert(uniqueKeys)));
+        Assert.assertEquals(expected, convert(uniqueKeys));
 
         // now let's test something which is smaller than a single page
         uniqueKeys = getPaged(store, 4,
@@ -711,7 +704,7 @@ public class SASIIndexTest
                 add("key07");
         }};
 
-        Assert.assertEquals(Sets.newHashSet(expected), Sets.newHashSet(convert(uniqueKeys)));
+        Assert.assertEquals(expected, convert(uniqueKeys));
 
         // the same but with the page size of 2 to test minimal pagination windows
 
@@ -719,7 +712,7 @@ public class SASIIndexTest
                               buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
                               buildExpression(age, Operator.EQ, Int32Type.instance.decompose(36)));
 
-        Assert.assertEquals(Sets.newHashSet(expected), Sets.newHashSet(convert(uniqueKeys)));
+        Assert.assertEquals(expected, convert(uniqueKeys));
 
         // and last but not least, test age range query with pagination
         uniqueKeys = getPaged(store, 4,
@@ -743,7 +736,7 @@ public class SASIIndexTest
                 add("key21");
         }};
 
-        Assert.assertEquals(Sets.newHashSet(expected), Sets.newHashSet(convert(uniqueKeys)));
+        Assert.assertEquals(expected, convert(uniqueKeys));
 
         Set<String> rows;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/test/unit/org/apache/cassandra/index/sasi/disk/KeyOffsetsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/sasi/disk/KeyOffsetsTest.java b/test/unit/org/apache/cassandra/index/sasi/disk/KeyOffsetsTest.java
deleted file mode 100644
index 21ef070..0000000
--- a/test/unit/org/apache/cassandra/index/sasi/disk/KeyOffsetsTest.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.index.sasi.disk;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-public class KeyOffsetsTest
-{
-    @Test
-    public void testDuplicates()
-    {
-        KeyOffsets offsets = new KeyOffsets();
-        long[] arr = new long[]{ 1, 2, 3, 4, 5, 6 };
-        offsets.put(1, arr);
-        Assert.assertArrayEquals(offsets.get(1), arr);
-        offsets.put(1, arr);
-        Assert.assertArrayEquals(offsets.get(1), arr);
-        for (long l : arr)
-            offsets.put(1, l);
-        Assert.assertArrayEquals(offsets.get(1), arr);
-
-        for (long l : arr)
-            offsets.put(2, l);
-        Assert.assertArrayEquals(offsets.get(2), arr);
-        offsets.put(2, arr);
-        Assert.assertArrayEquals(offsets.get(2), arr);
-        offsets.put(2, arr);
-        Assert.assertArrayEquals(offsets.get(2), arr);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/test/unit/org/apache/cassandra/index/sasi/disk/OnDiskIndexTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/sasi/disk/OnDiskIndexTest.java b/test/unit/org/apache/cassandra/index/sasi/disk/OnDiskIndexTest.java
index b56cb4e..10dc7a8 100644
--- a/test/unit/org/apache/cassandra/index/sasi/disk/OnDiskIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/sasi/disk/OnDiskIndexTest.java
@@ -24,16 +24,13 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import com.carrotsearch.hppc.cursors.LongObjectCursor;
 import org.apache.cassandra.cql3.Operator;
-import org.apache.cassandra.db.Clustering;
-import org.apache.cassandra.db.ClusteringComparator;
+import org.apache.cassandra.db.BufferDecoratedKey;
 import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.index.sasi.plan.Expression;
 import org.apache.cassandra.index.sasi.utils.CombinedTerm;
 import org.apache.cassandra.index.sasi.utils.CombinedTermIterator;
-import org.apache.cassandra.index.sasi.utils.KeyConverter;
 import org.apache.cassandra.index.sasi.utils.OnDiskIndexIterator;
 import org.apache.cassandra.index.sasi.utils.RangeIterator;
 import org.apache.cassandra.db.marshal.AbstractType;
@@ -41,8 +38,13 @@ import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.db.marshal.LongType;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.utils.MurmurHash;
 import org.apache.cassandra.utils.Pair;
 
+import com.carrotsearch.hppc.LongSet;
+import com.carrotsearch.hppc.cursors.LongCursor;
+
+import com.google.common.base.Function;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Sets;
 
@@ -85,7 +87,7 @@ public class OnDiskIndexTest
 
         builder.finish(index);
 
-        OnDiskIndex onDisk = new OnDiskIndex(index, UTF8Type.instance, KeyConverter.instance);
+        OnDiskIndex onDisk = new OnDiskIndex(index, UTF8Type.instance, new KeyConverter());
 
         // first check if we can find exact matches
         for (Map.Entry<ByteBuffer, TokenTreeBuilder> e : data.entrySet())
@@ -93,13 +95,11 @@ public class OnDiskIndexTest
             if (UTF8Type.instance.getString(e.getKey()).equals("cat"))
                 continue; // cat is embedded into scat, we'll test it in next section
 
-            Assert.assertEquals("Key was: " + UTF8Type.instance.compose(e.getKey()),
-                                convert(e.getValue()),
-                                convert(onDisk.search(expressionFor(UTF8Type.instance, e.getKey()))));
+            Assert.assertEquals("Key was: " + UTF8Type.instance.compose(e.getKey()), convert(e.getValue()), convert(onDisk.search(expressionFor(UTF8Type.instance, e.getKey()))));
         }
 
         // check that cat returns positions for scat & cat
-        Assert.assertEquals(convert(1L, 4L), convert(onDisk.search(expressionFor("cat"))));
+        Assert.assertEquals(convert(1, 4), convert(onDisk.search(expressionFor("cat"))));
 
         // random suffix queries
         Assert.assertEquals(convert(9, 10), convert(onDisk.search(expressionFor("ar"))));
@@ -143,7 +143,7 @@ public class OnDiskIndexTest
 
         builder.finish(index);
 
-        OnDiskIndex onDisk = new OnDiskIndex(index, Int32Type.instance, KeyConverter.instance);
+        OnDiskIndex onDisk = new OnDiskIndex(index, Int32Type.instance, new KeyConverter());
 
         for (Map.Entry<ByteBuffer, TokenTreeBuilder> e : data.entrySet())
         {
@@ -224,14 +224,14 @@ public class OnDiskIndexTest
 
         OnDiskIndexBuilder iterTest = new OnDiskIndexBuilder(UTF8Type.instance, Int32Type.instance, OnDiskIndexBuilder.Mode.PREFIX);
         for (int i = 0; i < iterCheckNums.size(); i++)
-            iterTest.add(iterCheckNums.get(i), keyAt((long) i), i, i + 5);
+            iterTest.add(iterCheckNums.get(i), keyAt((long) i), i);
 
         File iterIndex = File.createTempFile("sa-iter", ".db");
         iterIndex.deleteOnExit();
 
         iterTest.finish(iterIndex);
 
-        onDisk = new OnDiskIndex(iterIndex, Int32Type.instance, KeyConverter.instance);
+        onDisk = new OnDiskIndex(iterIndex, Int32Type.instance, new KeyConverter());
 
         ByteBuffer number = Int32Type.instance.decompose(1);
         Assert.assertEquals(0, Iterators.size(onDisk.iteratorAt(number, OnDiskIndex.IteratorOrder.ASC, false)));
@@ -283,7 +283,7 @@ public class OnDiskIndexTest
 
         builder.finish(index);
 
-        OnDiskIndex onDisk = new OnDiskIndex(index, UTF8Type.instance, KeyConverter.instance);
+        OnDiskIndex onDisk = new OnDiskIndex(index, UTF8Type.instance, new KeyConverter());
 
         Assert.assertEquals(convert(1, 2, 3, 4, 5, 6), convert(onDisk.search(expressionFor("liz"))));
         Assert.assertEquals(convert(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), convert(onDisk.search(expressionFor("a"))));
@@ -315,14 +315,14 @@ public class OnDiskIndexTest
         final int numIterations = 100000;
 
         for (long i = 0; i < numIterations; i++)
-            builder.add(LongType.instance.decompose(start + i), keyAt(i), i, clusteringOffset(i));
+            builder.add(LongType.instance.decompose(start + i), keyAt(i), i);
 
         File index = File.createTempFile("on-disk-sa-sparse", "db");
         index.deleteOnExit();
 
         builder.finish(index);
 
-        OnDiskIndex onDisk = new OnDiskIndex(index, LongType.instance, KeyConverter.instance);
+        OnDiskIndex onDisk = new OnDiskIndex(index, LongType.instance, new KeyConverter());
 
         ThreadLocalRandom random = ThreadLocalRandom.current();
 
@@ -343,9 +343,9 @@ public class OnDiskIndexTest
             if (upperInclusive)
                 upperKey += 1;
 
-            Set<RowKey> actual = convert(rows);
+            Set<DecoratedKey> actual = convert(rows);
             for (long key = lowerKey; key < upperKey; key++)
-                Assert.assertTrue("key" + key + " wasn't found", actual.contains(new RowKey(keyAt(key), ck(clusteringOffset(key)), CLUSTERING_COMPARATOR)));
+                Assert.assertTrue("key" + key + " wasn't found", actual.contains(keyAt(key)));
 
             Assert.assertEquals((upperKey - lowerKey), actual.size());
         }
@@ -353,7 +353,7 @@ public class OnDiskIndexTest
         // let's also explicitly test whole range search
         RangeIterator<Long, Token> rows = onDisk.search(expressionFor(start, true, start + numIterations, true));
 
-        Set<RowKey> actual = convert(rows);
+        Set<DecoratedKey> actual = convert(rows);
         Assert.assertEquals(numIterations, actual.size());
     }
 
@@ -380,7 +380,7 @@ public class OnDiskIndexTest
 
         builder.finish(index);
 
-        OnDiskIndex onDisk = new OnDiskIndex(index, UTF8Type.instance, KeyConverter.instance);
+        OnDiskIndex onDisk = new OnDiskIndex(index, UTF8Type.instance, new KeyConverter());
 
         // test whole words first
         Assert.assertEquals(convert(3, 4, 5, 6, 7, 8, 9, 10), convert(onDisk.search(expressionForNot("Aleksey", "Vijay", "Pavel"))));
@@ -424,7 +424,7 @@ public class OnDiskIndexTest
 
         builder.finish(index);
 
-        OnDiskIndex onDisk = new OnDiskIndex(index, Int32Type.instance, KeyConverter.instance);
+        OnDiskIndex onDisk = new OnDiskIndex(index, Int32Type.instance, new KeyConverter());
 
         Assert.assertEquals(convert(1, 2, 4, 5, 6, 7, 8, 9, 10, 11, 12), convert(onDisk.search(expressionForNot(0, 10, 1))));
         Assert.assertEquals(convert(1, 2, 4, 5, 7, 9, 10, 11, 12), convert(onDisk.search(expressionForNot(0, 10, 1, 8))));
@@ -439,16 +439,16 @@ public class OnDiskIndexTest
         final long lower = 0;
         final long upper = 100000;
 
-        OnDiskIndexBuilder builder = new OnDiskIndexBuilder(LongType.instance, LongType.instance, OnDiskIndexBuilder.Mode.SPARSE);
+        OnDiskIndexBuilder builder = new OnDiskIndexBuilder(UTF8Type.instance, LongType.instance, OnDiskIndexBuilder.Mode.SPARSE);
         for (long i = lower; i <= upper; i++)
-            builder.add(LongType.instance.decompose(i), keyAt(i), i, clusteringOffset(i));
+            builder.add(LongType.instance.decompose(i), keyAt(i), i);
 
         File index = File.createTempFile("on-disk-sa-except-long-ranges", "db");
         index.deleteOnExit();
 
         builder.finish(index);
 
-        OnDiskIndex onDisk = new OnDiskIndex(index, LongType.instance, KeyConverter.instance);
+        OnDiskIndex onDisk = new OnDiskIndex(index, LongType.instance, new KeyConverter());
 
         ThreadLocalRandom random = ThreadLocalRandom.current();
 
@@ -503,10 +503,10 @@ public class OnDiskIndexTest
     private int validateExclusions(OnDiskIndex sa, long lower, long upper, Set<Long> exclusions, boolean checkCount)
     {
         int count = 0;
-        for (RowKey key : convert(sa.search(rangeWithExclusions(lower, true, upper, true, exclusions))))
+        for (DecoratedKey key : convert(sa.search(rangeWithExclusions(lower, true, upper, true, exclusions))))
         {
-            long keyId = LongType.instance.compose(key.decoratedKey.getKey());
-            Assert.assertFalse("key" + keyId + " is present.", exclusions.contains(keyId));
+            String keyId = UTF8Type.instance.getString(key.getKey()).split("key")[1];
+            Assert.assertFalse("key" + keyId + " is present.", exclusions.contains(Long.valueOf(keyId)));
             count++;
         }
 
@@ -519,49 +519,40 @@ public class OnDiskIndexTest
     @Test
     public void testDescriptor() throws Exception
     {
-        final Map<ByteBuffer, Pair<RowKey, Long>> data = new HashMap<ByteBuffer, Pair<RowKey, Long>>()
+        final Map<ByteBuffer, Pair<DecoratedKey, Long>> data = new HashMap<ByteBuffer, Pair<DecoratedKey, Long>>()
         {{
-                put(Int32Type.instance.decompose(5), Pair.create(new RowKey(keyAt(1L), ck(clusteringOffset(1L)), CLUSTERING_COMPARATOR) , 1L));
+                put(Int32Type.instance.decompose(5), Pair.create(keyAt(1L), 1L));
         }};
 
         OnDiskIndexBuilder builder1 = new OnDiskIndexBuilder(UTF8Type.instance, Int32Type.instance, OnDiskIndexBuilder.Mode.PREFIX);
-        for (Map.Entry<ByteBuffer, Pair<RowKey, Long>> e : data.entrySet())
+        OnDiskIndexBuilder builder2 = new OnDiskIndexBuilder(UTF8Type.instance, Int32Type.instance, OnDiskIndexBuilder.Mode.PREFIX);
+        for (Map.Entry<ByteBuffer, Pair<DecoratedKey, Long>> e : data.entrySet())
         {
-            DecoratedKey key = e.getValue().left.decoratedKey;
+            DecoratedKey key = e.getValue().left;
             Long position = e.getValue().right;
 
-            builder1.add(e.getKey(), key, position, clusteringOffset(position));
+            builder1.add(e.getKey(), key, position);
+            builder2.add(e.getKey(), key, position);
         }
 
         File index1 = File.createTempFile("on-disk-sa-int", "db");
-
+        File index2 = File.createTempFile("on-disk-sa-int2", "db");
         index1.deleteOnExit();
+        index2.deleteOnExit();
 
         builder1.finish(index1);
-        OnDiskIndex onDisk1 = new OnDiskIndex(index1, Int32Type.instance, KeyConverter.instance);
-        ByteBuffer number = Int32Type.instance.decompose(5);
-        Assert.assertEquals(Collections.singleton(data.get(number).left), convert(onDisk1.search(expressionFor(Operator.EQ, Int32Type.instance, number))));
-        Assert.assertEquals(onDisk1.descriptor.version, Descriptor.CURRENT_VERSION);
-    }
-
+        builder2.finish(new Descriptor(Descriptor.VERSION_AA), index2);
 
-    static final String DATA_DIR = "test/data/legacy-sasi/";
-
-    @Test
-    public void testLegacyDescriptor() throws Exception
-    {
-        final Map<ByteBuffer, Pair<RowKey, Long>> data = new HashMap<ByteBuffer, Pair<RowKey, Long>>()
-        {{
-            put(Int32Type.instance.decompose(5), Pair.create(new RowKey(keyAt(1L), ck(KeyOffsets.NO_OFFSET), CLUSTERING_COMPARATOR) , 1L));
-        }};
-
-        File index2 = new File(DATA_DIR + "on-disk-sa-int2.db");
-        OnDiskIndex onDisk2 = new OnDiskIndex(index2, Int32Type.instance, KeyConverter.instance);
+        OnDiskIndex onDisk1 = new OnDiskIndex(index1, Int32Type.instance, new KeyConverter());
+        OnDiskIndex onDisk2 = new OnDiskIndex(index2, Int32Type.instance, new KeyConverter());
 
         ByteBuffer number = Int32Type.instance.decompose(5);
+
+        Assert.assertEquals(Collections.singleton(data.get(number).left), convert(onDisk1.search(expressionFor(Operator.EQ, Int32Type.instance, number))));
         Assert.assertEquals(Collections.singleton(data.get(number).left), convert(onDisk2.search(expressionFor(Operator.EQ, Int32Type.instance, number))));
 
-        Assert.assertEquals(onDisk2.descriptor.version, Descriptor.VERSION_AA);
+        Assert.assertEquals(onDisk1.descriptor.version.version, Descriptor.CURRENT_VERSION);
+        Assert.assertEquals(onDisk2.descriptor.version.version, Descriptor.VERSION_AA);
     }
 
     @Test
@@ -583,7 +574,7 @@ public class OnDiskIndexTest
 
         builder.finish(index);
 
-        OnDiskIndex onDisk = new OnDiskIndex(index, Int32Type.instance, KeyConverter.instance);
+        OnDiskIndex onDisk = new OnDiskIndex(index, Int32Type.instance, new KeyConverter());
         OnDiskIndex.OnDiskSuperBlock superBlock = onDisk.dataLevel.getSuperBlock(0);
         Iterator<Token> iter = superBlock.iterator();
 
@@ -604,14 +595,14 @@ public class OnDiskIndexTest
     {
         OnDiskIndexBuilder builder = new OnDiskIndexBuilder(UTF8Type.instance, LongType.instance, OnDiskIndexBuilder.Mode.SPARSE);
         for (long i = 0; i < 100000; i++)
-            builder.add(LongType.instance.decompose(i), keyAt(i), i, clusteringOffset(i));
+            builder.add(LongType.instance.decompose(i), keyAt(i), i);
 
         File index = File.createTempFile("on-disk-sa-multi-superblock-match", ".db");
         index.deleteOnExit();
 
         builder.finish(index);
 
-        OnDiskIndex onDiskIndex = new OnDiskIndex(index, LongType.instance, KeyConverter.instance);
+        OnDiskIndex onDiskIndex = new OnDiskIndex(index, LongType.instance, new KeyConverter());
 
         testSearchRangeWithSuperBlocks(onDiskIndex, 0, 500);
         testSearchRangeWithSuperBlocks(onDiskIndex, 300, 93456);
@@ -626,9 +617,9 @@ public class OnDiskIndexTest
         }
     }
 
-    public void putAll(SortedMap<Long, KeyOffsets> offsets, TokenTreeBuilder ttb)
+    public void putAll(SortedMap<Long, LongSet> offsets, TokenTreeBuilder ttb)
     {
-        for (Pair<Long, KeyOffsets> entry : ttb)
+        for (Pair<Long, LongSet> entry : ttb)
             offsets.put(entry.left, entry.right);
     }
 
@@ -638,26 +629,26 @@ public class OnDiskIndexTest
         OnDiskIndexBuilder builderA = new OnDiskIndexBuilder(UTF8Type.instance, LongType.instance, OnDiskIndexBuilder.Mode.PREFIX);
         OnDiskIndexBuilder builderB = new OnDiskIndexBuilder(UTF8Type.instance, LongType.instance, OnDiskIndexBuilder.Mode.PREFIX);
 
-        TreeMap<Long, TreeMap<Long, KeyOffsets>> expected = new TreeMap<>();
+        TreeMap<Long, TreeMap<Long, LongSet>> expected = new TreeMap<>();
 
         for (long i = 0; i <= 100; i++)
         {
-            TreeMap<Long, KeyOffsets> offsets = expected.get(i);
+            TreeMap<Long, LongSet> offsets = expected.get(i);
             if (offsets == null)
                 expected.put(i, (offsets = new TreeMap<>()));
 
-            builderA.add(LongType.instance.decompose(i), keyAt(i), i, clusteringOffset(i));
+            builderA.add(LongType.instance.decompose(i), keyAt(i), i);
             putAll(offsets, keyBuilder(i));
         }
 
         for (long i = 50; i < 100; i++)
         {
-            TreeMap<Long, KeyOffsets> offsets = expected.get(i);
+            TreeMap<Long, LongSet> offsets = expected.get(i);
             if (offsets == null)
                 expected.put(i, (offsets = new TreeMap<>()));
 
             long position = 100L + i;
-            builderB.add(LongType.instance.decompose(i), keyAt(position), position, clusteringOffset(position));
+            builderB.add(LongType.instance.decompose(i), keyAt(position), position);
             putAll(offsets, keyBuilder(100L + i));
         }
 
@@ -670,19 +661,19 @@ public class OnDiskIndexTest
         builderA.finish(indexA);
         builderB.finish(indexB);
 
-        OnDiskIndex a = new OnDiskIndex(indexA, LongType.instance, KeyConverter.instance);
-        OnDiskIndex b = new OnDiskIndex(indexB, LongType.instance, KeyConverter.instance);
+        OnDiskIndex a = new OnDiskIndex(indexA, LongType.instance, new KeyConverter());
+        OnDiskIndex b = new OnDiskIndex(indexB, LongType.instance, new KeyConverter());
 
         RangeIterator<OnDiskIndex.DataTerm, CombinedTerm> union = OnDiskIndexIterator.union(a, b);
 
-        TreeMap<Long, TreeMap<Long, KeyOffsets>> actual = new TreeMap<>();
+        TreeMap<Long, TreeMap<Long, LongSet>> actual = new TreeMap<>();
         while (union.hasNext())
         {
             CombinedTerm term = union.next();
 
             Long composedTerm = LongType.instance.compose(term.getTerm());
 
-            TreeMap<Long, KeyOffsets> offsets = actual.get(composedTerm);
+            TreeMap<Long, LongSet> offsets = actual.get(composedTerm);
             if (offsets == null)
                 actual.put(composedTerm, (offsets = new TreeMap<>()));
 
@@ -697,7 +688,7 @@ public class OnDiskIndexTest
         OnDiskIndexBuilder combined = new OnDiskIndexBuilder(UTF8Type.instance, LongType.instance, OnDiskIndexBuilder.Mode.PREFIX);
         combined.finish(Pair.create(keyAt(0).getKey(), keyAt(100).getKey()), indexC, new CombinedTermIterator(a, b));
 
-        OnDiskIndex c = new OnDiskIndex(indexC, LongType.instance, KeyConverter.instance);
+        OnDiskIndex c = new OnDiskIndex(indexC, LongType.instance, new KeyConverter());
         union = OnDiskIndexIterator.union(c);
         actual.clear();
 
@@ -707,7 +698,7 @@ public class OnDiskIndexTest
 
             Long composedTerm = LongType.instance.compose(term.getTerm());
 
-            TreeMap<Long, KeyOffsets> offsets = actual.get(composedTerm);
+            TreeMap<Long, LongSet> offsets = actual.get(composedTerm);
             if (offsets == null)
                 actual.put(composedTerm, (offsets = new TreeMap<>()));
 
@@ -747,7 +738,7 @@ public class OnDiskIndexTest
 
         builder.finish(index);
 
-        OnDiskIndex onDisk = new OnDiskIndex(index, UTF8Type.instance, KeyConverter.instance);
+        OnDiskIndex onDisk = new OnDiskIndex(index, UTF8Type.instance, new KeyConverter());
 
         // check that lady% return lady gaga (1) and lady pank (3) but not lady of bells(2)
         Assert.assertEquals(convert(1, 3), convert(onDisk.search(expressionFor("lady", Operator.LIKE_PREFIX))));
@@ -771,7 +762,7 @@ public class OnDiskIndexTest
         while (tokens.hasNext())
         {
             Token token = tokens.next();
-            Iterator<RowKey> keys = token.iterator();
+            Iterator<DecoratedKey> keys = token.iterator();
 
             // each of the values should have exactly a single key
             Assert.assertTrue(keys.hasNext());
@@ -780,7 +771,7 @@ public class OnDiskIndexTest
 
             // and it's last should always smaller than current
             if (lastToken != null)
-                Assert.assertTrue("last should be less than current", lastToken < token.get());
+                Assert.assertTrue("last should be less than current", lastToken.compareTo(token.get()) < 0);
 
             lastToken = token.get();
             keyCount++;
@@ -789,84 +780,61 @@ public class OnDiskIndexTest
         Assert.assertEquals(end - start, keyCount);
     }
 
-    private static DecoratedKey keyAt(long partitionOffset)
-    {
-        return KeyConverter.dk(partitionOffset);
-    }
-
-    private static Clustering ck(long rowOffset)
-    {
-        return KeyConverter.ck(rowOffset);
-    }
-
-    private TokenTreeBuilder keyBuilder(long... offsets)
+    private static DecoratedKey keyAt(long rawKey)
     {
-        TokenTreeBuilder builder = new DynamicTokenTreeBuilder();
-
-        for (final long pkOffset : offsets)
-        {
-            DecoratedKey k = keyAt(pkOffset);
-            builder.add((Long) k.getToken().getTokenValue(), pkOffset, clusteringOffset(pkOffset));
-        }
-
-        return builder.finish();
+        ByteBuffer key = ByteBuffer.wrap(("key" + rawKey).getBytes());
+        return new BufferDecoratedKey(new Murmur3Partitioner.LongToken(MurmurHash.hash2_64(key, key.position(), key.remaining(), 0)), key);
     }
 
-    private static long clusteringOffset(long offset)
-    {
-        return offset + 100;
-    }
-
-    private TokenTreeBuilder keyBuilder(Pair<Long, Long>... offsets)
+    private static TokenTreeBuilder keyBuilder(Long... keys)
     {
         TokenTreeBuilder builder = new DynamicTokenTreeBuilder();
 
-        for (final Pair<Long,Long> key : offsets)
+        for (final Long key : keys)
         {
-            DecoratedKey k = keyAt(key.left);
-            builder.add((Long) k.getToken().getTokenValue(), key.left, key.right);
+            DecoratedKey dk = keyAt(key);
+            builder.add((Long) dk.getToken().getTokenValue(), key);
         }
 
         return builder.finish();
     }
 
-    private static final ClusteringComparator CLUSTERING_COMPARATOR = new ClusteringComparator(BytesType.instance);
-
-    private static Set<RowKey> convert(TokenTreeBuilder offsets)
+    private static Set<DecoratedKey> convert(TokenTreeBuilder offsets)
     {
-        Set<RowKey> result = new HashSet<>();
+        Set<DecoratedKey> result = new HashSet<>();
 
-        Iterator<Pair<Long, KeyOffsets>> offsetIter = offsets.iterator();
+        Iterator<Pair<Long, LongSet>> offsetIter = offsets.iterator();
         while (offsetIter.hasNext())
         {
-            Pair<Long, KeyOffsets> pair = offsetIter.next();
+            LongSet v = offsetIter.next().right;
 
-            for (LongObjectCursor<long[]> cursor : pair.right)
-                for (long l : cursor.value)
-                    result.add(new RowKey(keyAt(cursor.key), ck(l), CLUSTERING_COMPARATOR));
+            for (LongCursor offset : v)
+                result.add(keyAt(offset.value));
         }
         return result;
     }
 
-    private static Set<RowKey> convert(long... keyOffsets)
+    private static Set<DecoratedKey> convert(long... keyOffsets)
     {
-        Set<RowKey> result = new HashSet<>();
-        for (final long offset : keyOffsets)
-            result.add(new RowKey(keyAt(offset), ck(clusteringOffset(offset)), CLUSTERING_COMPARATOR));
+        Set<DecoratedKey> result = new HashSet<>();
+        for (long offset : keyOffsets)
+            result.add(keyAt(offset));
 
         return result;
     }
 
-    private static Set<RowKey> convert(RangeIterator<Long, Token> results)
+    private static Set<DecoratedKey> convert(RangeIterator<Long, Token> results)
     {
         if (results == null)
             return Collections.emptySet();
 
-        Set<RowKey> keys = new TreeSet<>();
+        Set<DecoratedKey> keys = new TreeSet<>(DecoratedKey.comparator);
 
         while (results.hasNext())
-            for (RowKey key: results.next())
+        {
+            for (DecoratedKey key : results.next())
                 keys.add(key);
+        }
 
         return keys;
     }
@@ -940,11 +908,19 @@ public class OnDiskIndexTest
 
     private static void addAll(OnDiskIndexBuilder builder, ByteBuffer term, TokenTreeBuilder tokens)
     {
-        for (Pair<Long, KeyOffsets> token : tokens)
+        for (Pair<Long, LongSet> token : tokens)
+        {
+            for (long position : token.right.toArray())
+                builder.add(term, keyAt(position), position);
+        }
+    }
+
+    private static class KeyConverter implements Function<Long, DecoratedKey>
+    {
+        @Override
+        public DecoratedKey apply(Long offset)
         {
-            for (LongObjectCursor<long[]> cursor : token.right)
-                for (long clusteringOffset : cursor.value)
-                    builder.add(term, keyAt(cursor.key), cursor.key, clusteringOffset);
+            return keyAt(offset);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/test/unit/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriterTest.java b/test/unit/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriterTest.java
index 61e4d67..f19d962 100644
--- a/test/unit/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriterTest.java
+++ b/test/unit/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriterTest.java
@@ -22,14 +22,11 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ThreadLocalRandom;
-import java.util.function.Supplier;
 
-import com.carrotsearch.hppc.cursors.LongObjectCursor;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.Clustering;
-import org.apache.cassandra.db.ClusteringComparator;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Keyspace;
@@ -38,9 +35,6 @@ import org.apache.cassandra.db.marshal.LongType;
 import org.apache.cassandra.db.rows.BTreeRow;
 import org.apache.cassandra.db.rows.BufferCell;
 import org.apache.cassandra.db.rows.Row;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.Murmur3Partitioner;
-import org.apache.cassandra.index.sasi.KeyFetcher;
 import org.apache.cassandra.index.sasi.SASIIndex;
 import org.apache.cassandra.index.sasi.utils.RangeIterator;
 import org.apache.cassandra.db.marshal.Int32Type;
@@ -76,8 +70,6 @@ public class PerSSTableIndexWriterTest extends SchemaLoader
                                                                      Tables.of(SchemaLoader.sasiCFMD(KS_NAME, CF_NAME))));
     }
 
-    private static final ClusteringComparator CLUSTERING_COMPARATOR = new ClusteringComparator(LongType.instance);
-
     @Test
     public void testPartialIndexWrites() throws Exception
     {
@@ -94,20 +86,19 @@ public class PerSSTableIndexWriterTest extends SchemaLoader
         Descriptor descriptor = Descriptor.fromFilename(cfs.getSSTablePath(directory));
         PerSSTableIndexWriter indexWriter = (PerSSTableIndexWriter) sasi.getFlushObserver(descriptor, OperationType.FLUSH);
 
-        SortedMap<RowKey, Row> expectedKeys = new TreeMap<>();
+        SortedMap<DecoratedKey, Row> expectedKeys = new TreeMap<>(DecoratedKey.comparator);
 
         for (int i = 0; i < maxKeys; i++)
         {
             ByteBuffer key = ByteBufferUtil.bytes(String.format(keyFormat, i));
-            Clustering clustering = Clustering.make(ByteBufferUtil.bytes(i * 1L));
-            expectedKeys.put(new RowKey(cfs.metadata.partitioner.decorateKey(key), clustering, CLUSTERING_COMPARATOR),
-                             BTreeRow.singleCellRow(clustering,
+            expectedKeys.put(cfs.metadata.partitioner.decorateKey(key),
+                             BTreeRow.singleCellRow(Clustering.EMPTY,
                                                     BufferCell.live(column, timestamp, Int32Type.instance.decompose(i))));
         }
 
         indexWriter.begin();
 
-        Iterator<Map.Entry<RowKey, Row>> keyIterator = expectedKeys.entrySet().iterator();
+        Iterator<Map.Entry<DecoratedKey, Row>> keyIterator = expectedKeys.entrySet().iterator();
         long position = 0;
 
         Set<String> segments = new HashSet<>();
@@ -119,11 +110,10 @@ public class PerSSTableIndexWriterTest extends SchemaLoader
                 if (!keyIterator.hasNext())
                     break outer;
 
-                Map.Entry<RowKey, Row> key = keyIterator.next();
+                Map.Entry<DecoratedKey, Row> key = keyIterator.next();
 
-                indexWriter.startPartition(key.getKey().decoratedKey, position);
-                indexWriter.nextUnfilteredCluster(key.getValue(), position);
-                position++;
+                indexWriter.startPartition(key.getKey(), position++);
+                indexWriter.nextUnfilteredCluster(key.getValue());
             }
 
             PerSSTableIndexWriter.Index index = indexWriter.getIndex(column);
@@ -144,12 +134,15 @@ public class PerSSTableIndexWriterTest extends SchemaLoader
         for (String segment : segments)
             Assert.assertFalse(new File(segment).exists());
 
-        OnDiskIndex index = new OnDiskIndex(new File(indexFile), Int32Type.instance, new FakeKeyFetcher(cfs, keyFormat));
+        OnDiskIndex index = new OnDiskIndex(new File(indexFile), Int32Type.instance, keyPosition -> {
+            ByteBuffer key = ByteBufferUtil.bytes(String.format(keyFormat, keyPosition));
+            return cfs.metadata.partitioner.decorateKey(key);
+        });
 
         Assert.assertEquals(0, UTF8Type.instance.compare(index.minKey(), ByteBufferUtil.bytes(String.format(keyFormat, 0))));
         Assert.assertEquals(0, UTF8Type.instance.compare(index.maxKey(), ByteBufferUtil.bytes(String.format(keyFormat, maxKeys - 1))));
 
-        Set<RowKey> actualKeys = new HashSet<>();
+        Set<DecoratedKey> actualKeys = new HashSet<>();
         int count = 0;
         for (OnDiskIndex.DataTerm term : index)
         {
@@ -157,7 +150,7 @@ public class PerSSTableIndexWriterTest extends SchemaLoader
 
             while (tokens.hasNext())
             {
-                for (RowKey key : tokens.next())
+                for (DecoratedKey key : tokens.next())
                     actualKeys.add(key);
             }
 
@@ -165,8 +158,8 @@ public class PerSSTableIndexWriterTest extends SchemaLoader
         }
 
         Assert.assertEquals(expectedKeys.size(), actualKeys.size());
-        for (RowKey key : expectedKeys.keySet())
-            Assert.assertTrue("Key was not present : " + key, actualKeys.contains(key));
+        for (DecoratedKey key : expectedKeys.keySet())
+            Assert.assertTrue(actualKeys.contains(key));
 
         FileUtils.closeQuietly(index);
     }
@@ -190,14 +183,11 @@ public class PerSSTableIndexWriterTest extends SchemaLoader
         indexWriter.begin();
         indexWriter.indexes.put(column, indexWriter.newIndex(sasi.getIndex()));
 
-        populateSegment(cfs.metadata, indexWriter.getIndex(column), new HashMap<Long, KeyOffsets>()
+        populateSegment(cfs.metadata, indexWriter.getIndex(column), new HashMap<Long, Set<Integer>>()
         {{
-            put(now,     new KeyOffsets() {{ put(0, 0); put(1, 1); }});
-            put(now + 1, new KeyOffsets() {{ put(2, 2); put(3, 3); }});
-            put(now + 2, new KeyOffsets() {{
-                put(4, 4); put(5, 5); put(6, 6);
-                put(7, 7); put(8, 8); put(9, 9);
-            }});
+            put(now,     new HashSet<>(Arrays.asList(0, 1)));
+            put(now + 1, new HashSet<>(Arrays.asList(2, 3)));
+            put(now + 2, new HashSet<>(Arrays.asList(4, 5, 6, 7, 8, 9)));
         }});
 
         Callable<OnDiskIndex> segmentBuilder = indexWriter.getIndex(column).scheduleSegmentFlush(false);
@@ -207,21 +197,15 @@ public class PerSSTableIndexWriterTest extends SchemaLoader
         PerSSTableIndexWriter.Index index = indexWriter.getIndex(column);
         Random random = ThreadLocalRandom.current();
 
-        Supplier<KeyOffsets> offsetSupplier = () -> new KeyOffsets() {{
-            put(random.nextInt(), random.nextInt());
-            put(random.nextInt(), random.nextInt());
-            put(random.nextInt(), random.nextInt());
-        }};
-
         Set<String> segments = new HashSet<>();
         // now let's test multiple correct segments with yield incorrect final segment
         for (int i = 0; i < 3; i++)
         {
-            populateSegment(cfs.metadata, index, new HashMap<Long, KeyOffsets>()
+            populateSegment(cfs.metadata, index, new HashMap<Long, Set<Integer>>()
             {{
-                put(now,     offsetSupplier.get());
-                put(now + 1, offsetSupplier.get());
-                put(now + 2, offsetSupplier.get());
+                put(now,     new HashSet<>(Arrays.asList(random.nextInt(), random.nextInt(), random.nextInt())));
+                put(now + 1, new HashSet<>(Arrays.asList(random.nextInt(), random.nextInt(), random.nextInt())));
+                put(now + 2, new HashSet<>(Arrays.asList(random.nextInt(), random.nextInt(), random.nextInt())));
             }});
 
             try
@@ -252,56 +236,16 @@ public class PerSSTableIndexWriterTest extends SchemaLoader
         Assert.assertFalse(new File(index.outputFile).exists());
     }
 
-    private static void populateSegment(CFMetaData metadata, PerSSTableIndexWriter.Index index, Map<Long, KeyOffsets> data)
+    private static void populateSegment(CFMetaData metadata, PerSSTableIndexWriter.Index index, Map<Long, Set<Integer>> data)
     {
-        for (Map.Entry<Long, KeyOffsets> value : data.entrySet())
+        for (Map.Entry<Long, Set<Integer>> value : data.entrySet())
         {
             ByteBuffer term = LongType.instance.decompose(value.getKey());
-            for (LongObjectCursor<long[]> cursor : value.getValue())
+            for (Integer keyPos : value.getValue())
             {
-                ByteBuffer key = ByteBufferUtil.bytes(String.format("key%06d", cursor.key));
-                for (long rowOffset : cursor.value)
-                {
-                    index.add(term,
-                              metadata.partitioner.decorateKey(key),
-                              ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE - 1),
-                              ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE - 1));
-                }
+                ByteBuffer key = ByteBufferUtil.bytes(String.format("key%06d", keyPos));
+                index.add(term, metadata.partitioner.decorateKey(key), ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE - 1));
             }
         }
     }
-
-    private final class FakeKeyFetcher implements KeyFetcher
-    {
-        private final ColumnFamilyStore cfs;
-        private final String keyFormat;
-
-        public FakeKeyFetcher(ColumnFamilyStore cfs, String keyFormat)
-        {
-            this.cfs = cfs;
-            this.keyFormat = keyFormat;
-        }
-
-        public DecoratedKey getPartitionKey(long keyPosition)
-        {
-            ByteBuffer key = ByteBufferUtil.bytes(String.format(keyFormat, keyPosition));
-            return cfs.metadata.partitioner.decorateKey(key);
-        }
-
-        public Clustering getClustering(long offset)
-        {
-            return Clustering.make(ByteBufferUtil.bytes(offset));
-        }
-
-        public RowKey getRowKey(long partitionOffset, long rowOffset)
-        {
-            return new RowKey(getPartitionKey(partitionOffset), getClustering(rowOffset), CLUSTERING_COMPARATOR);
-        }
-    }
-
-    public IPartitioner getPartitioner()
-    {
-        return Murmur3Partitioner.instance;
-    }
-
 }