You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by xe...@apache.org on 2016/09/06 05:19:18 UTC

[2/3] cassandra git commit: Add row offset support to SASI

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/disk/TokenTreeBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/TokenTreeBuilder.java b/src/java/org/apache/cassandra/index/sasi/disk/TokenTreeBuilder.java
index 2210964..07804d6 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/TokenTreeBuilder.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/TokenTreeBuilder.java
@@ -20,28 +20,62 @@ package org.apache.cassandra.index.sasi.disk;
 import java.io.IOException;
 import java.util.*;
 
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.io.util.*;
+import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.obs.BitUtil;
 
-import com.carrotsearch.hppc.LongSet;
-
-public interface TokenTreeBuilder extends Iterable<Pair<Long, LongSet>>
+public interface TokenTreeBuilder extends Iterable<Pair<Long, KeyOffsets>>
 {
-    int BLOCK_BYTES = 4096;
-    int BLOCK_HEADER_BYTES = 64;
-    int OVERFLOW_TRAILER_BYTES = 64;
-    int OVERFLOW_TRAILER_CAPACITY = OVERFLOW_TRAILER_BYTES / 8;
-    int TOKENS_PER_BLOCK = (BLOCK_BYTES - BLOCK_HEADER_BYTES - OVERFLOW_TRAILER_BYTES) / 16;
-    long MAX_OFFSET = (1L << 47) - 1; // 48 bits for (signed) offset
-    byte LAST_LEAF_SHIFT = 1;
-    byte SHARED_HEADER_BYTES = 19;
-    byte ENTRY_TYPE_MASK = 0x03;
-    short AB_MAGIC = 0x5A51;
+    final static int BLOCK_BYTES = 4096;
+
+    final static int LEAF_ENTRY_TYPE_BYTES = Short.BYTES;
+    final static int TOKEN_OFFSET_BYTES = LEAF_ENTRY_TYPE_BYTES;
+    final static int LEAF_PARTITON_OFFSET_BYTES = Long.BYTES;
+    final static int LEAF_ROW_OFFSET_BYTES = Long.BYTES;
+
+    final static int LEAF_PARTITON_OFFSET_PACKED_BYTES = Integer.BYTES;
+    final static int LEAF_ROW_OFFSET_PACKED_BYTES = Integer.BYTES;
+    final static int COLLISION_ENTRY_BYTES = LEAF_PARTITON_OFFSET_BYTES + LEAF_ROW_OFFSET_BYTES;
+
+    final static int HEADER_INFO_BYTE_BYTES = Byte.BYTES;
+    final static int HEADER_TOKEN_COUNT_BYTES = Short.BYTES;
+
+    final static int ROOT_HEADER_MAGIC_SIZE = Short.BYTES;
+    final static int ROOT_HEADER_TOKEN_COUNT_SIZE = Long.BYTES;
+
+    // Partitioner token size in bytes
+    final static int TOKEN_BYTES = Long.BYTES;
+
+    // Leaf entry size in bytes, see {@class SimpleLeafEntry} for a full description
+    final static int LEAF_ENTRY_BYTES = LEAF_ENTRY_TYPE_BYTES + TOKEN_BYTES + LEAF_PARTITON_OFFSET_BYTES + LEAF_ROW_OFFSET_BYTES;
+    // Shared header size in bytes, see {@class AbstractTreeBuilder$Header} for a full description
+    final static int SHARED_HEADER_BYTES = HEADER_INFO_BYTE_BYTES + HEADER_TOKEN_COUNT_BYTES + 2 * TOKEN_BYTES;
+    // Block header size in bytes, see {@class AbstractTreeBuilder$RootHeader}
+    final static int BLOCK_HEADER_BYTES = BitUtil.nextHighestPowerOfTwo(SHARED_HEADER_BYTES + ROOT_HEADER_MAGIC_SIZE + ROOT_HEADER_TOKEN_COUNT_SIZE + 2 * TOKEN_BYTES);
+
+    // Overflow trailer capacity is currently 8 overflow items. Each overflow item consists of two longs.
+    final static int OVERFLOW_TRAILER_CAPACITY = 8;
+    final static int OVERFLOW_TRAILER_BYTES = OVERFLOW_TRAILER_CAPACITY * COLLISION_ENTRY_BYTES;;
+    final static int TOKENS_PER_BLOCK = (TokenTreeBuilder.BLOCK_BYTES - BLOCK_HEADER_BYTES - OVERFLOW_TRAILER_BYTES) / LEAF_ENTRY_BYTES;
+
+    final static int LEGACY_LEAF_ENTRY_BYTES = Short.BYTES + Short.BYTES + TOKEN_BYTES + Integer.BYTES;
+    final static int LEGACY_TOKEN_OFFSET_BYTES = 2 * Short.BYTES;
+    final static byte LAST_LEAF_SHIFT = 1;
+
+    /**
+     * {@code Header} size in bytes.
+     */
+    final byte ENTRY_TYPE_MASK = 0x03;
+    final short AB_MAGIC = 0x5A51;
+    final short AC_MAGIC = 0x7C63;
 
     // note: ordinal positions are used here, do not change order
     enum EntryType
     {
-        SIMPLE, FACTORED, PACKED, OVERFLOW;
+        SIMPLE,
+        FACTORED,
+        PACKED,
+        OVERFLOW;
 
         public static EntryType of(int ordinal)
         {
@@ -61,9 +95,9 @@ public interface TokenTreeBuilder extends Iterable<Pair<Long, LongSet>>
         }
     }
 
-    void add(Long token, long keyPosition);
-    void add(SortedMap<Long, LongSet> data);
-    void add(Iterator<Pair<Long, LongSet>> data);
+    void add(Long token, long partitionOffset, long rowOffset);
+    void add(SortedMap<Long, KeyOffsets> data);
+    void add(Iterator<Pair<Long, KeyOffsets>> data);
     void add(TokenTreeBuilder ttb);
 
     boolean isEmpty();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/memory/IndexMemtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/memory/IndexMemtable.java b/src/java/org/apache/cassandra/index/sasi/memory/IndexMemtable.java
index e55a806..a7b22f3 100644
--- a/src/java/org/apache/cassandra/index/sasi/memory/IndexMemtable.java
+++ b/src/java/org/apache/cassandra/index/sasi/memory/IndexMemtable.java
@@ -19,14 +19,14 @@ package org.apache.cassandra.index.sasi.memory;
 
 import java.nio.ByteBuffer;
 
-import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.index.sasi.conf.ColumnIndex;
+import org.apache.cassandra.index.sasi.disk.*;
 import org.apache.cassandra.index.sasi.disk.Token;
 import org.apache.cassandra.index.sasi.plan.Expression;
 import org.apache.cassandra.index.sasi.utils.RangeIterator;
 import org.apache.cassandra.index.sasi.utils.TypeUtil;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.*;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,7 +42,7 @@ public class IndexMemtable
         this.index = MemIndex.forColumn(columnIndex.keyValidator(), columnIndex);
     }
 
-    public long index(DecoratedKey key, ByteBuffer value)
+    public long index(RowKey key, ByteBuffer value)
     {
         if (value == null || value.remaining() == 0)
             return 0;
@@ -55,7 +55,7 @@ public class IndexMemtable
             {
                 logger.error("Can't add column {} to index for key: {}, value size {}, validator: {}.",
                              index.columnIndex.getColumnName(),
-                             index.columnIndex.keyValidator().getString(key.getKey()),
+                             index.columnIndex.keyValidator().getString(key.decoratedKey.getKey()),
                              FBUtilities.prettyPrintMemory(size),
                              validator);
                 return 0;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/memory/KeyRangeIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/memory/KeyRangeIterator.java b/src/java/org/apache/cassandra/index/sasi/memory/KeyRangeIterator.java
index a2f2c0e..b4365dc 100644
--- a/src/java/org/apache/cassandra/index/sasi/memory/KeyRangeIterator.java
+++ b/src/java/org/apache/cassandra/index/sasi/memory/KeyRangeIterator.java
@@ -18,28 +18,27 @@
 package org.apache.cassandra.index.sasi.memory;
 
 import java.io.IOException;
-import java.util.Iterator;
-import java.util.SortedSet;
-import java.util.TreeSet;
+import java.util.*;
 import java.util.concurrent.ConcurrentSkipListSet;
 
-import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.index.sasi.disk.*;
 import org.apache.cassandra.index.sasi.disk.Token;
 import org.apache.cassandra.index.sasi.utils.AbstractIterator;
 import org.apache.cassandra.index.sasi.utils.CombinedValue;
 import org.apache.cassandra.index.sasi.utils.RangeIterator;
 
-import com.carrotsearch.hppc.LongOpenHashSet;
-import com.carrotsearch.hppc.LongSet;
 import com.google.common.collect.PeekingIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class KeyRangeIterator extends RangeIterator<Long, Token>
 {
     private final DKIterator iterator;
 
-    public KeyRangeIterator(ConcurrentSkipListSet<DecoratedKey> keys)
+    public KeyRangeIterator(ConcurrentSkipListSet<RowKey> keys)
     {
-        super((Long) keys.first().getToken().getTokenValue(), (Long) keys.last().getToken().getTokenValue(), keys.size());
+        super((Long) keys.first().decoratedKey.getToken().getTokenValue(), (Long) keys.last().decoratedKey.getToken().getTokenValue(), keys.size());
         this.iterator = new DKIterator(keys.iterator());
     }
 
@@ -52,8 +51,8 @@ public class KeyRangeIterator extends RangeIterator<Long, Token>
     {
         while (iterator.hasNext())
         {
-            DecoratedKey key = iterator.peek();
-            if (Long.compare((long) key.getToken().getTokenValue(), nextToken) >= 0)
+            RowKey key = iterator.peek();
+            if (Long.compare((Long) key.decoratedKey.getToken().getTokenValue(), nextToken) >= 0)
                 break;
 
             // consume smaller key
@@ -64,16 +63,16 @@ public class KeyRangeIterator extends RangeIterator<Long, Token>
     public void close() throws IOException
     {}
 
-    private static class DKIterator extends AbstractIterator<DecoratedKey> implements PeekingIterator<DecoratedKey>
+    private static class DKIterator extends AbstractIterator<RowKey> implements PeekingIterator<RowKey>
     {
-        private final Iterator<DecoratedKey> keys;
+        private final Iterator<RowKey> keys;
 
-        public DKIterator(Iterator<DecoratedKey> keys)
+        public DKIterator(Iterator<RowKey> keys)
         {
             this.keys = keys;
         }
 
-        protected DecoratedKey computeNext()
+        protected RowKey computeNext()
         {
             return keys.hasNext() ? keys.next() : endOfData();
         }
@@ -81,25 +80,21 @@ public class KeyRangeIterator extends RangeIterator<Long, Token>
 
     private static class DKToken extends Token
     {
-        private final SortedSet<DecoratedKey> keys;
+        private final SortedSet<RowKey> keys;
 
-        public DKToken(final DecoratedKey key)
+        public DKToken(RowKey key)
         {
-            super((long) key.getToken().getTokenValue());
+            super((Long) key.decoratedKey.getToken().getTokenValue());
 
-            keys = new TreeSet<DecoratedKey>(DecoratedKey.comparator)
+            keys = new TreeSet<RowKey>(RowKey.COMPARATOR)
             {{
                 add(key);
             }};
         }
 
-        public LongSet getOffsets()
+        public KeyOffsets getOffsets()
         {
-            LongSet offsets = new LongOpenHashSet(4);
-            for (DecoratedKey key : keys)
-                offsets.add((long) key.getToken().getTokenValue());
-
-            return offsets;
+            throw new IllegalStateException("DecoratedKey tokens are used in memtables and do not have on-disk offsets");
         }
 
         public void merge(CombinedValue<Long> other)
@@ -116,14 +111,14 @@ public class KeyRangeIterator extends RangeIterator<Long, Token>
             }
             else
             {
-                for (DecoratedKey key : o)
+                for (RowKey key : o)
                     keys.add(key);
             }
         }
 
-        public Iterator<DecoratedKey> iterator()
+        public Iterator<RowKey> iterator()
         {
             return keys.iterator();
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/memory/MemIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/memory/MemIndex.java b/src/java/org/apache/cassandra/index/sasi/memory/MemIndex.java
index cc1eb3f..bfba4cb 100644
--- a/src/java/org/apache/cassandra/index/sasi/memory/MemIndex.java
+++ b/src/java/org/apache/cassandra/index/sasi/memory/MemIndex.java
@@ -19,8 +19,8 @@ package org.apache.cassandra.index.sasi.memory;
 
 import java.nio.ByteBuffer;
 
-import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.index.sasi.conf.ColumnIndex;
+import org.apache.cassandra.index.sasi.disk.*;
 import org.apache.cassandra.index.sasi.disk.Token;
 import org.apache.cassandra.index.sasi.plan.Expression;
 import org.apache.cassandra.index.sasi.utils.RangeIterator;
@@ -37,7 +37,7 @@ public abstract class MemIndex
         this.columnIndex = columnIndex;
     }
 
-    public abstract long add(DecoratedKey key, ByteBuffer value);
+    public abstract long add(RowKey key, ByteBuffer value);
     public abstract RangeIterator<Long, Token> search(Expression expression);
 
     public static MemIndex forColumn(AbstractType<?> keyValidator, ColumnIndex columnIndex)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/memory/SkipListMemIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/memory/SkipListMemIndex.java b/src/java/org/apache/cassandra/index/sasi/memory/SkipListMemIndex.java
index 69b57d0..9c3562a 100644
--- a/src/java/org/apache/cassandra/index/sasi/memory/SkipListMemIndex.java
+++ b/src/java/org/apache/cassandra/index/sasi/memory/SkipListMemIndex.java
@@ -22,8 +22,8 @@ import java.util.*;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ConcurrentSkipListSet;
 
-import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.index.sasi.conf.ColumnIndex;
+import org.apache.cassandra.index.sasi.disk.*;
 import org.apache.cassandra.index.sasi.disk.Token;
 import org.apache.cassandra.index.sasi.plan.Expression;
 import org.apache.cassandra.index.sasi.utils.RangeUnionIterator;
@@ -34,7 +34,7 @@ public class SkipListMemIndex extends MemIndex
 {
     public static final int CSLM_OVERHEAD = 128; // average overhead of CSLM
 
-    private final ConcurrentSkipListMap<ByteBuffer, ConcurrentSkipListSet<DecoratedKey>> index;
+    private final ConcurrentSkipListMap<ByteBuffer, ConcurrentSkipListSet<RowKey>> index;
 
     public SkipListMemIndex(AbstractType<?> keyValidator, ColumnIndex columnIndex)
     {
@@ -42,14 +42,14 @@ public class SkipListMemIndex extends MemIndex
         index = new ConcurrentSkipListMap<>(columnIndex.getValidator());
     }
 
-    public long add(DecoratedKey key, ByteBuffer value)
+    public long add(RowKey key, ByteBuffer value)
     {
         long overhead = CSLM_OVERHEAD; // DKs are shared
-        ConcurrentSkipListSet<DecoratedKey> keys = index.get(value);
+        ConcurrentSkipListSet<RowKey> keys = index.get(value);
 
         if (keys == null)
         {
-            ConcurrentSkipListSet<DecoratedKey> newKeys = new ConcurrentSkipListSet<>(DecoratedKey.comparator);
+            ConcurrentSkipListSet<RowKey> newKeys = new ConcurrentSkipListSet<>();
             keys = index.putIfAbsent(value, newKeys);
             if (keys == null)
             {
@@ -68,7 +68,7 @@ public class SkipListMemIndex extends MemIndex
         ByteBuffer min = expression.lower == null ? null : expression.lower.value;
         ByteBuffer max = expression.upper == null ? null : expression.upper.value;
 
-        SortedMap<ByteBuffer, ConcurrentSkipListSet<DecoratedKey>> search;
+        SortedMap<ByteBuffer, ConcurrentSkipListSet<RowKey>> search;
 
         if (min == null && max == null)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/memory/TrieMemIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/memory/TrieMemIndex.java b/src/java/org/apache/cassandra/index/sasi/memory/TrieMemIndex.java
index ca60ac5..e1c273d 100644
--- a/src/java/org/apache/cassandra/index/sasi/memory/TrieMemIndex.java
+++ b/src/java/org/apache/cassandra/index/sasi/memory/TrieMemIndex.java
@@ -23,9 +23,8 @@ import java.util.List;
 import java.util.concurrent.ConcurrentSkipListSet;
 
 import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.index.sasi.conf.ColumnIndex;
-import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder;
+import org.apache.cassandra.index.sasi.disk.*;
 import org.apache.cassandra.index.sasi.disk.Token;
 import org.apache.cassandra.index.sasi.plan.Expression;
 import org.apache.cassandra.index.sasi.plan.Expression.Op;
@@ -38,7 +37,7 @@ import com.googlecode.concurrenttrees.radix.ConcurrentRadixTree;
 import com.googlecode.concurrenttrees.suffix.ConcurrentSuffixTree;
 import com.googlecode.concurrenttrees.radix.node.concrete.SmartArrayBasedNodeFactory;
 import com.googlecode.concurrenttrees.radix.node.Node;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.*;
 
 
 import org.slf4j.Logger;
@@ -71,7 +70,7 @@ public class TrieMemIndex extends MemIndex
         }
     }
 
-    public long add(DecoratedKey key, ByteBuffer value)
+    public long add(RowKey key, ByteBuffer value)
     {
         AbstractAnalyzer analyzer = columnIndex.getAnalyzer();
         analyzer.reset(value.duplicate());
@@ -85,7 +84,7 @@ public class TrieMemIndex extends MemIndex
             {
                 logger.info("Can't add term of column {} to index for key: {}, term size {}, max allowed size {}, use analyzed = true (if not yet set) for that column.",
                             columnIndex.getColumnName(),
-                            keyValidator.getString(key.getKey()),
+                            keyValidator.getString(key.decoratedKey.getKey()),
                             FBUtilities.prettyPrintMemory(term.remaining()),
                             FBUtilities.prettyPrintMemory(OnDiskIndexBuilder.MAX_TERM_SIZE));
                 continue;
@@ -113,13 +112,13 @@ public class TrieMemIndex extends MemIndex
             definition = column;
         }
 
-        public long add(String value, DecoratedKey key)
+        public long add(String value, RowKey key)
         {
             long overhead = CSLM_OVERHEAD;
-            ConcurrentSkipListSet<DecoratedKey> keys = get(value);
+            ConcurrentSkipListSet<RowKey> keys = get(value);
             if (keys == null)
             {
-                ConcurrentSkipListSet<DecoratedKey> newKeys = new ConcurrentSkipListSet<>(DecoratedKey.comparator);
+                ConcurrentSkipListSet<RowKey> newKeys = new ConcurrentSkipListSet<>();
                 keys = putIfAbsent(value, newKeys);
                 if (keys == null)
                 {
@@ -141,10 +140,10 @@ public class TrieMemIndex extends MemIndex
         {
             ByteBuffer prefix = expression.lower == null ? null : expression.lower.value;
 
-            Iterable<ConcurrentSkipListSet<DecoratedKey>> search = search(expression.getOp(), definition.cellValueType().getString(prefix));
+            Iterable<ConcurrentSkipListSet<RowKey>> search = search(expression.getOp(), definition.cellValueType().getString(prefix));
 
             RangeUnionIterator.Builder<Long, Token> builder = RangeUnionIterator.builder();
-            for (ConcurrentSkipListSet<DecoratedKey> keys : search)
+            for (ConcurrentSkipListSet<RowKey> keys : search)
             {
                 if (!keys.isEmpty())
                     builder.add(new KeyRangeIterator(keys));
@@ -153,14 +152,14 @@ public class TrieMemIndex extends MemIndex
             return builder.build();
         }
 
-        protected abstract ConcurrentSkipListSet<DecoratedKey> get(String value);
-        protected abstract Iterable<ConcurrentSkipListSet<DecoratedKey>> search(Op operator, String value);
-        protected abstract ConcurrentSkipListSet<DecoratedKey> putIfAbsent(String value, ConcurrentSkipListSet<DecoratedKey> key);
+        protected abstract ConcurrentSkipListSet<RowKey> get(String value);
+        protected abstract Iterable<ConcurrentSkipListSet<RowKey>> search(Op operator, String value);
+        protected abstract ConcurrentSkipListSet<RowKey> putIfAbsent(String value, ConcurrentSkipListSet<RowKey> key);
     }
 
     protected static class ConcurrentPrefixTrie extends ConcurrentTrie
     {
-        private final ConcurrentRadixTree<ConcurrentSkipListSet<DecoratedKey>> trie;
+        private final ConcurrentRadixTree<ConcurrentSkipListSet<RowKey>> trie;
 
         private ConcurrentPrefixTrie(ColumnDefinition column)
         {
@@ -168,23 +167,23 @@ public class TrieMemIndex extends MemIndex
             trie = new ConcurrentRadixTree<>(NODE_FACTORY);
         }
 
-        public ConcurrentSkipListSet<DecoratedKey> get(String value)
+        public ConcurrentSkipListSet<RowKey> get(String value)
         {
             return trie.getValueForExactKey(value);
         }
 
-        public ConcurrentSkipListSet<DecoratedKey> putIfAbsent(String value, ConcurrentSkipListSet<DecoratedKey> newKeys)
+        public ConcurrentSkipListSet<RowKey> putIfAbsent(String value, ConcurrentSkipListSet<RowKey> newKeys)
         {
             return trie.putIfAbsent(value, newKeys);
         }
 
-        public Iterable<ConcurrentSkipListSet<DecoratedKey>> search(Op operator, String value)
+        public Iterable<ConcurrentSkipListSet<RowKey>> search(Op operator, String value)
         {
             switch (operator)
             {
                 case EQ:
                 case MATCH:
-                    ConcurrentSkipListSet<DecoratedKey> keys = trie.getValueForExactKey(value);
+                    ConcurrentSkipListSet<RowKey> keys = trie.getValueForExactKey(value);
                     return keys == null ? Collections.emptyList() : Collections.singletonList(keys);
 
                 case PREFIX:
@@ -198,7 +197,7 @@ public class TrieMemIndex extends MemIndex
 
     protected static class ConcurrentSuffixTrie extends ConcurrentTrie
     {
-        private final ConcurrentSuffixTree<ConcurrentSkipListSet<DecoratedKey>> trie;
+        private final ConcurrentSuffixTree<ConcurrentSkipListSet<RowKey>> trie;
 
         private ConcurrentSuffixTrie(ColumnDefinition column)
         {
@@ -206,23 +205,23 @@ public class TrieMemIndex extends MemIndex
             trie = new ConcurrentSuffixTree<>(NODE_FACTORY);
         }
 
-        public ConcurrentSkipListSet<DecoratedKey> get(String value)
+        public ConcurrentSkipListSet<RowKey> get(String value)
         {
             return trie.getValueForExactKey(value);
         }
 
-        public ConcurrentSkipListSet<DecoratedKey> putIfAbsent(String value, ConcurrentSkipListSet<DecoratedKey> newKeys)
+        public ConcurrentSkipListSet<RowKey> putIfAbsent(String value, ConcurrentSkipListSet<RowKey> newKeys)
         {
             return trie.putIfAbsent(value, newKeys);
         }
 
-        public Iterable<ConcurrentSkipListSet<DecoratedKey>> search(Op operator, String value)
+        public Iterable<ConcurrentSkipListSet<RowKey>> search(Op operator, String value)
         {
             switch (operator)
             {
                 case EQ:
                 case MATCH:
-                    ConcurrentSkipListSet<DecoratedKey> keys = trie.getValueForExactKey(value);
+                    ConcurrentSkipListSet<RowKey> keys = trie.getValueForExactKey(value);
                     return keys == null ? Collections.emptyList() : Collections.singletonList(keys);
 
                 case SUFFIX:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java b/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java
index fa1181f..af4e249 100644
--- a/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java
+++ b/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java
@@ -17,15 +17,18 @@
  */
 package org.apache.cassandra.index.sasi.plan;
 
+import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.Sets;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.filter.DataLimits;
-import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.index.Index;
 import org.apache.cassandra.index.sasi.SASIIndex;
@@ -42,10 +45,12 @@ import org.apache.cassandra.index.sasi.utils.RangeUnionIterator;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.*;
 
 public class QueryController
 {
+    private static final Logger logger = LoggerFactory.getLogger(QueryController.class);
+
     private final long executionQuota;
     private final long executionStart;
 
@@ -94,22 +99,26 @@ public class QueryController
         return index.isPresent() ? ((SASIIndex) index.get()).getIndex() : null;
     }
 
-
-    public UnfilteredRowIterator getPartition(DecoratedKey key, ReadExecutionController executionController)
+    public UnfilteredRowIterator getPartition(DecoratedKey key, NavigableSet<Clustering> clusterings, ReadExecutionController executionController)
     {
         if (key == null)
             throw new NullPointerException();
+
         try
         {
-            SinglePartitionReadCommand partition = SinglePartitionReadCommand.create(command.isForThrift(),
-                                                                                     cfs.metadata,
+            ClusteringIndexFilter filter;
+            if (clusterings == null)
+                filter = new ClusteringIndexSliceFilter(Slices.ALL, false);
+            else
+                filter = new ClusteringIndexNamesFilter(clusterings, false);
+
+            SinglePartitionReadCommand partition = SinglePartitionReadCommand.create(cfs.metadata,
                                                                                      command.nowInSec(),
                                                                                      command.columnFilter(),
-                                                                                     command.rowFilter().withoutExpressions(),
+                                                                                     command.rowFilter(),
                                                                                      DataLimits.NONE,
                                                                                      key,
-                                                                                     command.clusteringIndexFilter(key));
-
+                                                                                     filter);
             return partition.queryMemtableAndDisk(cfs, executionController);
         }
         finally
@@ -135,20 +144,24 @@ public class QueryController
 
         RangeIterator.Builder<Long, Token> builder = op == OperationType.OR
                                                 ? RangeUnionIterator.<Long, Token>builder()
-                                                : RangeIntersectionIterator.<Long, Token>builder();
+                                                : RangeIntersectionIterator.<Long, org.apache.cassandra.index.sasi.disk.Token>builder();
 
         List<RangeIterator<Long, Token>> perIndexUnions = new ArrayList<>();
 
         for (Map.Entry<Expression, Set<SSTableIndex>> e : getView(op, expressions).entrySet())
         {
-            @SuppressWarnings("resource") // RangeIterators are closed by releaseIndexes
-            RangeIterator<Long, Token> index = TermIterator.build(e.getKey(), e.getValue());
-
-            if (index == null)
-                continue;
+            try (RangeIterator<Long, Token> index = TermIterator.build(e.getKey(), e.getValue()))
+            {
+                if (index == null)
+                    continue;
 
-            builder.add(index);
-            perIndexUnions.add(index);
+                builder.add(index);
+                perIndexUnions.add(index);
+            }
+            catch (IOException ex)
+            {
+                logger.error("Failed to release index: ", ex);
+            }
         }
 
         resources.put(expressions, perIndexUnions);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java b/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java
index 4410756..ccb369c 100644
--- a/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java
+++ b/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java
@@ -19,16 +19,19 @@ package org.apache.cassandra.index.sasi.plan;
 
 import java.util.*;
 
-import org.apache.cassandra.config.CFMetaData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.*;
+import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.index.sasi.disk.*;
 import org.apache.cassandra.index.sasi.disk.Token;
-import org.apache.cassandra.index.sasi.plan.Operation.OperationType;
-import org.apache.cassandra.exceptions.RequestTimeoutException;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.utils.AbstractIterator;
+import org.apache.cassandra.index.sasi.plan.Operation.*;
+import org.apache.cassandra.utils.btree.*;
 
 public class QueryPlan
 {
@@ -68,14 +71,16 @@ public class QueryPlan
         return new ResultIterator(analyze(), controller, executionController);
     }
 
-    private static class ResultIterator extends AbstractIterator<UnfilteredRowIterator> implements UnfilteredPartitionIterator
+    private static class ResultIterator implements UnfilteredPartitionIterator
     {
         private final AbstractBounds<PartitionPosition> keyRange;
         private final Operation operationTree;
         private final QueryController controller;
         private final ReadExecutionController executionController;
 
-        private Iterator<DecoratedKey> currentKeys = null;
+        private Iterator<RowKey> currentKeys = null;
+        private UnfilteredRowIterator nextPartition = null;
+        private DecoratedKey lastPartitionKey = null;
 
         public ResultIterator(Operation operationTree, QueryController controller, ReadExecutionController executionController)
         {
@@ -87,53 +92,152 @@ public class QueryPlan
                 operationTree.skipTo((Long) keyRange.left.getToken().getTokenValue());
         }
 
-        protected UnfilteredRowIterator computeNext()
+        public boolean hasNext()
+        {
+            return prepareNext();
+        }
+
+        public UnfilteredRowIterator next()
+        {
+            if (nextPartition == null)
+                prepareNext();
+
+            UnfilteredRowIterator toReturn = nextPartition;
+            nextPartition = null;
+            return toReturn;
+        }
+
+        private boolean prepareNext()
         {
             if (operationTree == null)
-                return endOfData();
+                return false;
+
+            if (nextPartition != null)
+                nextPartition.close();
 
             for (;;)
             {
                 if (currentKeys == null || !currentKeys.hasNext())
                 {
                     if (!operationTree.hasNext())
-                         return endOfData();
+                        return false;
 
                     Token token = operationTree.next();
                     currentKeys = token.iterator();
                 }
 
-                while (currentKeys.hasNext())
+                CFMetaData metadata = controller.metadata();
+                BTreeSet.Builder<Clustering> clusterings = BTreeSet.builder(metadata.comparator);
+                // results have static clustering, the whole partition has to be read
+                boolean fetchWholePartition = false;
+
+                while (true)
                 {
-                    DecoratedKey key = currentKeys.next();
+                    if (!currentKeys.hasNext())
+                    {
+                        // No more keys for this token.
+                        // If no clusterings were collected yet, exit this inner loop so the operation
+                        // tree iterator can move on to the next token.
+                        // If some clusterings were collected, build an iterator for those rows
+                        // and return.
+                        if ((clusterings.isEmpty() && !fetchWholePartition) || lastPartitionKey == null)
+                            break;
+
+                        UnfilteredRowIterator partition = fetchPartition(lastPartitionKey, clusterings.build(), fetchWholePartition);
+                        // Prepare for next partition, reset partition key and clusterings
+                        lastPartitionKey = null;
+                        clusterings = BTreeSet.builder(metadata.comparator);
+
+                        if (partition.isEmpty())
+                        {
+                            partition.close();
+                            continue;
+                        }
+
+                        nextPartition = partition;
+                        return true;
+                    }
+
+                    RowKey fullKey = currentKeys.next();
+                    DecoratedKey key = fullKey.decoratedKey;
 
                     if (!keyRange.right.isMinimum() && keyRange.right.compareTo(key) < 0)
-                        return endOfData();
+                        return false;
 
-                    try (UnfilteredRowIterator partition = controller.getPartition(key, executionController))
+                    if (lastPartitionKey != null && metadata.getKeyValidator().compare(lastPartitionKey.getKey(), key.getKey()) != 0)
                     {
-                        Row staticRow = partition.staticRow();
-                        List<Unfiltered> clusters = new ArrayList<>();
+                        UnfilteredRowIterator partition = fetchPartition(lastPartitionKey, clusterings.build(), fetchWholePartition);
 
-                        while (partition.hasNext())
+                        if (partition.isEmpty())
+                            partition.close();
+                        else
                         {
-                            Unfiltered row = partition.next();
-                            if (operationTree.satisfiedBy(row, staticRow, true))
-                                clusters.add(row);
+                            nextPartition = partition;
+                            return true;
                         }
-
-                        if (!clusters.isEmpty())
-                            return new PartitionIterator(partition, clusters);
                     }
+
+                    lastPartitionKey = key;
+
+                    // We fetch whole partition for versions before AC and in case static column index is queried in AC
+                    if (fullKey.clustering == null || fullKey.clustering.clustering().kind() == ClusteringPrefix.Kind.STATIC_CLUSTERING)
+                        fetchWholePartition = true;
+                    else
+                        clusterings.add(fullKey.clustering);
+
                 }
             }
         }
 
+        private UnfilteredRowIterator fetchPartition(DecoratedKey key, NavigableSet<Clustering> clusterings, boolean fetchWholePartition)
+        {
+            if (fetchWholePartition)
+                clusterings = null;
+
+            try (UnfilteredRowIterator partition = controller.getPartition(key, clusterings, executionController))
+            {
+                Row staticRow = partition.staticRow();
+                List<Unfiltered> clusters = new ArrayList<>();
+
+                while (partition.hasNext())
+                {
+                    Unfiltered row = partition.next();
+                    if (operationTree.satisfiedBy(row, staticRow, true))
+                        clusters.add(row);
+                }
+
+                if (!clusters.isEmpty())
+                    return new PartitionIterator(partition, clusters);
+                else
+                    return UnfilteredRowIterators.noRowsIterator(partition.metadata(),
+                                                                 partition.partitionKey(),
+                                                                 Rows.EMPTY_STATIC_ROW,
+                                                                 partition.partitionLevelDeletion(),
+                                                                 partition.isReverseOrder());
+            }
+        }
+
+        public void close()
+        {
+            if (nextPartition != null)
+                nextPartition.close();
+        }
+
+        public boolean isForThrift()
+        {
+            return controller.isForThrift();
+        }
+
+        public CFMetaData metadata()
+        {
+            return controller.metadata();
+        }
+
         private static class PartitionIterator extends AbstractUnfilteredRowIterator
         {
             private final Iterator<Unfiltered> rows;
 
-            public PartitionIterator(UnfilteredRowIterator partition, Collection<Unfiltered> content)
+            public PartitionIterator(UnfilteredRowIterator partition, Collection<Unfiltered> filteredRows)
             {
                 super(partition.metadata(),
                       partition.partitionKey(),
@@ -143,7 +247,7 @@ public class QueryPlan
                       partition.isReverseOrder(),
                       partition.stats());
 
-                rows = content.iterator();
+                rows = filteredRows.iterator();
             }
 
             @Override
@@ -152,21 +256,5 @@ public class QueryPlan
                 return rows.hasNext() ? rows.next() : endOfData();
             }
         }
-
-        public boolean isForThrift()
-        {
-            return controller.isForThrift();
-        }
-
-        public CFMetaData metadata()
-        {
-            return controller.metadata();
-        }
-
-        public void close()
-        {
-            FileUtils.closeQuietly(operationTree);
-            controller.finish();
-        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/io/sstable/format/SSTableFlushObserver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableFlushObserver.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableFlushObserver.java
index f0b6bac..35898aa 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableFlushObserver.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableFlushObserver.java
@@ -46,6 +46,11 @@ public interface SSTableFlushObserver
      *
      * @param unfilteredCluster The unfiltered cluster being added to SSTable.
      */
+    default void nextUnfilteredCluster(Unfiltered unfilteredCluster, long position)
+    {
+        nextUnfilteredCluster(unfilteredCluster);
+    }
+
     void nextUnfilteredCluster(Unfiltered unfilteredCluster);
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 87d2a6e..8442ed7 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -50,8 +50,7 @@ import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.config.SchemaConstants;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ColumnFilter;
-import org.apache.cassandra.db.rows.EncodingStats;
-import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
@@ -63,6 +62,7 @@ import org.apache.cassandra.io.sstable.metadata.*;
 import org.apache.cassandra.io.util.*;
 import org.apache.cassandra.metrics.RestorableMeter;
 import org.apache.cassandra.metrics.StorageMetrics;
+import org.apache.cassandra.net.*;
 import org.apache.cassandra.schema.CachingParams;
 import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.service.ActiveRepairService;
@@ -1780,6 +1780,35 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
     }
 
     /**
+     * Reads Clustering Key from the data file of current sstable.
+     *
+     * @param rowPosition start position of given row in the data file
+     * @return Clustering of the row
+     * @throws IOException
+     */
+    public Clustering clusteringAt(long rowPosition) throws IOException
+    {
+        Clustering clustering;
+        try (FileDataInput in = dfile.createReader(rowPosition))
+        {
+            if (in.isEOF())
+                return null;
+
+            int flags = in.readUnsignedByte();
+            int extendedFlags = UnfilteredSerializer.readExtendedFlags(in, flags);
+            boolean isStatic = UnfilteredSerializer.isStatic(extendedFlags);
+
+            if (isStatic)
+                clustering = Clustering.STATIC_CLUSTERING;
+            else
+                // Since this is an internal call, we don't have to take care of protocol versions that use legacy layout
+                clustering = Clustering.serializer.deserialize(in, MessagingService.VERSION_30, header.clusteringTypes());
+        }
+
+        return clustering;
+    }
+
+    /**
      * TODO: Move someplace reusable
      */
     public abstract static class Operator

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index 5696ecb..93d9822 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -21,9 +21,7 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Optional;
+import java.util.*;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -163,7 +161,9 @@ public class BigTableWriter extends SSTableWriter
             return null;
 
         long startPosition = beforeAppend(key);
-        observers.forEach((o) -> o.startPartition(key, iwriter.indexFile.position()));
+        observers.forEach((o) -> {
+            o.startPartition(key, iwriter.indexFile.position());
+        });
 
         //Reuse the writer for each row
         columnIndexWriter.reset();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/utils/obs/BitUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/obs/BitUtil.java b/src/java/org/apache/cassandra/utils/obs/BitUtil.java
index e04de2b..c438d1b 100644
--- a/src/java/org/apache/cassandra/utils/obs/BitUtil.java
+++ b/src/java/org/apache/cassandra/utils/obs/BitUtil.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.utils.obs;
 /**  A variety of high efficiency bit twiddling routines.
  * @lucene.internal
  */
-final class BitUtil
+public final class BitUtil
 {
 
   /** Returns the number of bits set in the long */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/test/data/legacy-sasi/on-disk-sa-int2.db
----------------------------------------------------------------------
diff --git a/test/data/legacy-sasi/on-disk-sa-int2.db b/test/data/legacy-sasi/on-disk-sa-int2.db
new file mode 100644
index 0000000..71f662f
Binary files /dev/null and b/test/data/legacy-sasi/on-disk-sa-int2.db differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java b/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java
index 0b4e9e2..fc5afac 100644
--- a/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java
@@ -76,6 +76,7 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 
+import com.google.common.collect.Sets;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Uninterruptibles;
 
@@ -92,7 +93,7 @@ public class SASIIndexTest
         PARTITIONER = Murmur3Partitioner.instance;
     }
 
-    private static final String KS_NAME = "sasi";
+    private static final String KS_NAME = "sasi_index_test";
     private static final String CF_NAME = "test_cf";
     private static final String CLUSTERING_CF_NAME_1 = "clustering_test_cf_1";
     private static final String CLUSTERING_CF_NAME_2 = "clustering_test_cf_2";
@@ -448,9 +449,15 @@ public class SASIIndexTest
         if (forceFlush)
             store.forceBlockingFlush();
 
-        final UntypedResultSet results = executeCQL(FTS_CF_NAME, "SELECT * FROM %s.%s WHERE artist LIKE 'lady%%'");
-        Assert.assertNotNull(results);
-        Assert.assertEquals(3, results.size());
+        CQLTester.assertRowsIgnoringOrder(executeCQL(FTS_CF_NAME, "SELECT * FROM %s.%s WHERE artist LIKE 'lady%%'"),
+                                          CQLTester.row(UUID.fromString("1a4abbcd-b5de-4c69-a578-31231e01ff09"), "Lady Gaga", "Poker Face"),
+                                          CQLTester.row(UUID.fromString("4f8dc18e-54e6-4e16-b507-c5324b61523b"), "Lady Pank", "Zamki na piasku"),
+                                          CQLTester.row(UUID.fromString("eaf294fa-bad5-49d4-8f08-35ba3636a706"), "Lady Pank", "Koncertowa"));
+
+        CQLTester.assertRowsIgnoringOrder(executeCQL(FTS_CF_NAME, "SELECT artist, title FROM %s.%s WHERE artist LIKE 'lady%%'"),
+                                          CQLTester.row("Lady Gaga", "Poker Face"),
+                                          CQLTester.row("Lady Pank", "Zamki na piasku"),
+                                          CQLTester.row("Lady Pank", "Koncertowa"));
     }
 
     @Test
@@ -664,7 +671,7 @@ public class SASIIndexTest
                 add("key21");
         }};
 
-        Assert.assertEquals(expected, convert(uniqueKeys));
+        Assert.assertEquals(Sets.newHashSet(expected), Sets.newHashSet(convert(uniqueKeys)));
 
         // now let's test a single equals condition
 
@@ -690,7 +697,7 @@ public class SASIIndexTest
                 add("key21");
         }};
 
-        Assert.assertEquals(expected, convert(uniqueKeys));
+        Assert.assertEquals(Sets.newHashSet(expected), Sets.newHashSet(convert(uniqueKeys)));
 
         // now let's test something which is smaller than a single page
         uniqueKeys = getPaged(store, 4,
@@ -704,7 +711,7 @@ public class SASIIndexTest
                 add("key07");
         }};
 
-        Assert.assertEquals(expected, convert(uniqueKeys));
+        Assert.assertEquals(Sets.newHashSet(expected), Sets.newHashSet(convert(uniqueKeys)));
 
         // the same but with the page size of 2 to test minimal pagination windows
 
@@ -712,7 +719,7 @@ public class SASIIndexTest
                               buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
                               buildExpression(age, Operator.EQ, Int32Type.instance.decompose(36)));
 
-        Assert.assertEquals(expected, convert(uniqueKeys));
+        Assert.assertEquals(Sets.newHashSet(expected), Sets.newHashSet(convert(uniqueKeys)));
 
         // and last but not least, test age range query with pagination
         uniqueKeys = getPaged(store, 4,
@@ -736,7 +743,7 @@ public class SASIIndexTest
                 add("key21");
         }};
 
-        Assert.assertEquals(expected, convert(uniqueKeys));
+        Assert.assertEquals(Sets.newHashSet(expected), Sets.newHashSet(convert(uniqueKeys)));
 
         Set<String> rows;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/test/unit/org/apache/cassandra/index/sasi/disk/KeyOffsetsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/sasi/disk/KeyOffsetsTest.java b/test/unit/org/apache/cassandra/index/sasi/disk/KeyOffsetsTest.java
new file mode 100644
index 0000000..21ef070
--- /dev/null
+++ b/test/unit/org/apache/cassandra/index/sasi/disk/KeyOffsetsTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.index.sasi.disk;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class KeyOffsetsTest
+{
+    @Test
+    public void testDuplicates()
+    {
+        KeyOffsets offsets = new KeyOffsets();
+        long[] arr = new long[]{ 1, 2, 3, 4, 5, 6 };
+        offsets.put(1, arr);
+        Assert.assertArrayEquals(offsets.get(1), arr);
+        offsets.put(1, arr);
+        Assert.assertArrayEquals(offsets.get(1), arr);
+        for (long l : arr)
+            offsets.put(1, l);
+        Assert.assertArrayEquals(offsets.get(1), arr);
+
+        for (long l : arr)
+            offsets.put(2, l);
+        Assert.assertArrayEquals(offsets.get(2), arr);
+        offsets.put(2, arr);
+        Assert.assertArrayEquals(offsets.get(2), arr);
+        offsets.put(2, arr);
+        Assert.assertArrayEquals(offsets.get(2), arr);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/test/unit/org/apache/cassandra/index/sasi/disk/OnDiskIndexTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/sasi/disk/OnDiskIndexTest.java b/test/unit/org/apache/cassandra/index/sasi/disk/OnDiskIndexTest.java
index 10dc7a8..b56cb4e 100644
--- a/test/unit/org/apache/cassandra/index/sasi/disk/OnDiskIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/sasi/disk/OnDiskIndexTest.java
@@ -24,13 +24,16 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import com.carrotsearch.hppc.cursors.LongObjectCursor;
 import org.apache.cassandra.cql3.Operator;
-import org.apache.cassandra.db.BufferDecoratedKey;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.ClusteringComparator;
 import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.index.sasi.plan.Expression;
 import org.apache.cassandra.index.sasi.utils.CombinedTerm;
 import org.apache.cassandra.index.sasi.utils.CombinedTermIterator;
+import org.apache.cassandra.index.sasi.utils.KeyConverter;
 import org.apache.cassandra.index.sasi.utils.OnDiskIndexIterator;
 import org.apache.cassandra.index.sasi.utils.RangeIterator;
 import org.apache.cassandra.db.marshal.AbstractType;
@@ -38,13 +41,8 @@ import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.db.marshal.LongType;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.utils.MurmurHash;
 import org.apache.cassandra.utils.Pair;
 
-import com.carrotsearch.hppc.LongSet;
-import com.carrotsearch.hppc.cursors.LongCursor;
-
-import com.google.common.base.Function;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Sets;
 
@@ -87,7 +85,7 @@ public class OnDiskIndexTest
 
         builder.finish(index);
 
-        OnDiskIndex onDisk = new OnDiskIndex(index, UTF8Type.instance, new KeyConverter());
+        OnDiskIndex onDisk = new OnDiskIndex(index, UTF8Type.instance, KeyConverter.instance);
 
         // first check if we can find exact matches
         for (Map.Entry<ByteBuffer, TokenTreeBuilder> e : data.entrySet())
@@ -95,11 +93,13 @@ public class OnDiskIndexTest
             if (UTF8Type.instance.getString(e.getKey()).equals("cat"))
                 continue; // cat is embedded into scat, we'll test it in next section
 
-            Assert.assertEquals("Key was: " + UTF8Type.instance.compose(e.getKey()), convert(e.getValue()), convert(onDisk.search(expressionFor(UTF8Type.instance, e.getKey()))));
+            Assert.assertEquals("Key was: " + UTF8Type.instance.compose(e.getKey()),
+                                convert(e.getValue()),
+                                convert(onDisk.search(expressionFor(UTF8Type.instance, e.getKey()))));
         }
 
         // check that cat returns positions for scat & cat
-        Assert.assertEquals(convert(1, 4), convert(onDisk.search(expressionFor("cat"))));
+        Assert.assertEquals(convert(1L, 4L), convert(onDisk.search(expressionFor("cat"))));
 
         // random suffix queries
         Assert.assertEquals(convert(9, 10), convert(onDisk.search(expressionFor("ar"))));
@@ -143,7 +143,7 @@ public class OnDiskIndexTest
 
         builder.finish(index);
 
-        OnDiskIndex onDisk = new OnDiskIndex(index, Int32Type.instance, new KeyConverter());
+        OnDiskIndex onDisk = new OnDiskIndex(index, Int32Type.instance, KeyConverter.instance);
 
         for (Map.Entry<ByteBuffer, TokenTreeBuilder> e : data.entrySet())
         {
@@ -224,14 +224,14 @@ public class OnDiskIndexTest
 
         OnDiskIndexBuilder iterTest = new OnDiskIndexBuilder(UTF8Type.instance, Int32Type.instance, OnDiskIndexBuilder.Mode.PREFIX);
         for (int i = 0; i < iterCheckNums.size(); i++)
-            iterTest.add(iterCheckNums.get(i), keyAt((long) i), i);
+            iterTest.add(iterCheckNums.get(i), keyAt((long) i), i, i + 5);
 
         File iterIndex = File.createTempFile("sa-iter", ".db");
         iterIndex.deleteOnExit();
 
         iterTest.finish(iterIndex);
 
-        onDisk = new OnDiskIndex(iterIndex, Int32Type.instance, new KeyConverter());
+        onDisk = new OnDiskIndex(iterIndex, Int32Type.instance, KeyConverter.instance);
 
         ByteBuffer number = Int32Type.instance.decompose(1);
         Assert.assertEquals(0, Iterators.size(onDisk.iteratorAt(number, OnDiskIndex.IteratorOrder.ASC, false)));
@@ -283,7 +283,7 @@ public class OnDiskIndexTest
 
         builder.finish(index);
 
-        OnDiskIndex onDisk = new OnDiskIndex(index, UTF8Type.instance, new KeyConverter());
+        OnDiskIndex onDisk = new OnDiskIndex(index, UTF8Type.instance, KeyConverter.instance);
 
         Assert.assertEquals(convert(1, 2, 3, 4, 5, 6), convert(onDisk.search(expressionFor("liz"))));
         Assert.assertEquals(convert(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), convert(onDisk.search(expressionFor("a"))));
@@ -315,14 +315,14 @@ public class OnDiskIndexTest
         final int numIterations = 100000;
 
         for (long i = 0; i < numIterations; i++)
-            builder.add(LongType.instance.decompose(start + i), keyAt(i), i);
+            builder.add(LongType.instance.decompose(start + i), keyAt(i), i, clusteringOffset(i));
 
         File index = File.createTempFile("on-disk-sa-sparse", "db");
         index.deleteOnExit();
 
         builder.finish(index);
 
-        OnDiskIndex onDisk = new OnDiskIndex(index, LongType.instance, new KeyConverter());
+        OnDiskIndex onDisk = new OnDiskIndex(index, LongType.instance, KeyConverter.instance);
 
         ThreadLocalRandom random = ThreadLocalRandom.current();
 
@@ -343,9 +343,9 @@ public class OnDiskIndexTest
             if (upperInclusive)
                 upperKey += 1;
 
-            Set<DecoratedKey> actual = convert(rows);
+            Set<RowKey> actual = convert(rows);
             for (long key = lowerKey; key < upperKey; key++)
-                Assert.assertTrue("key" + key + " wasn't found", actual.contains(keyAt(key)));
+                Assert.assertTrue("key" + key + " wasn't found", actual.contains(new RowKey(keyAt(key), ck(clusteringOffset(key)), CLUSTERING_COMPARATOR)));
 
             Assert.assertEquals((upperKey - lowerKey), actual.size());
         }
@@ -353,7 +353,7 @@ public class OnDiskIndexTest
         // let's also explicitly test whole range search
         RangeIterator<Long, Token> rows = onDisk.search(expressionFor(start, true, start + numIterations, true));
 
-        Set<DecoratedKey> actual = convert(rows);
+        Set<RowKey> actual = convert(rows);
         Assert.assertEquals(numIterations, actual.size());
     }
 
@@ -380,7 +380,7 @@ public class OnDiskIndexTest
 
         builder.finish(index);
 
-        OnDiskIndex onDisk = new OnDiskIndex(index, UTF8Type.instance, new KeyConverter());
+        OnDiskIndex onDisk = new OnDiskIndex(index, UTF8Type.instance, KeyConverter.instance);
 
         // test whole words first
         Assert.assertEquals(convert(3, 4, 5, 6, 7, 8, 9, 10), convert(onDisk.search(expressionForNot("Aleksey", "Vijay", "Pavel"))));
@@ -424,7 +424,7 @@ public class OnDiskIndexTest
 
         builder.finish(index);
 
-        OnDiskIndex onDisk = new OnDiskIndex(index, Int32Type.instance, new KeyConverter());
+        OnDiskIndex onDisk = new OnDiskIndex(index, Int32Type.instance, KeyConverter.instance);
 
         Assert.assertEquals(convert(1, 2, 4, 5, 6, 7, 8, 9, 10, 11, 12), convert(onDisk.search(expressionForNot(0, 10, 1))));
         Assert.assertEquals(convert(1, 2, 4, 5, 7, 9, 10, 11, 12), convert(onDisk.search(expressionForNot(0, 10, 1, 8))));
@@ -439,16 +439,16 @@ public class OnDiskIndexTest
         final long lower = 0;
         final long upper = 100000;
 
-        OnDiskIndexBuilder builder = new OnDiskIndexBuilder(UTF8Type.instance, LongType.instance, OnDiskIndexBuilder.Mode.SPARSE);
+        OnDiskIndexBuilder builder = new OnDiskIndexBuilder(LongType.instance, LongType.instance, OnDiskIndexBuilder.Mode.SPARSE);
         for (long i = lower; i <= upper; i++)
-            builder.add(LongType.instance.decompose(i), keyAt(i), i);
+            builder.add(LongType.instance.decompose(i), keyAt(i), i, clusteringOffset(i));
 
         File index = File.createTempFile("on-disk-sa-except-long-ranges", "db");
         index.deleteOnExit();
 
         builder.finish(index);
 
-        OnDiskIndex onDisk = new OnDiskIndex(index, LongType.instance, new KeyConverter());
+        OnDiskIndex onDisk = new OnDiskIndex(index, LongType.instance, KeyConverter.instance);
 
         ThreadLocalRandom random = ThreadLocalRandom.current();
 
@@ -503,10 +503,10 @@ public class OnDiskIndexTest
     private int validateExclusions(OnDiskIndex sa, long lower, long upper, Set<Long> exclusions, boolean checkCount)
     {
         int count = 0;
-        for (DecoratedKey key : convert(sa.search(rangeWithExclusions(lower, true, upper, true, exclusions))))
+        for (RowKey key : convert(sa.search(rangeWithExclusions(lower, true, upper, true, exclusions))))
         {
-            String keyId = UTF8Type.instance.getString(key.getKey()).split("key")[1];
-            Assert.assertFalse("key" + keyId + " is present.", exclusions.contains(Long.valueOf(keyId)));
+            long keyId = LongType.instance.compose(key.decoratedKey.getKey());
+            Assert.assertFalse("key" + keyId + " is present.", exclusions.contains(keyId));
             count++;
         }
 
@@ -519,40 +519,49 @@ public class OnDiskIndexTest
     @Test
     public void testDescriptor() throws Exception
     {
-        final Map<ByteBuffer, Pair<DecoratedKey, Long>> data = new HashMap<ByteBuffer, Pair<DecoratedKey, Long>>()
+        final Map<ByteBuffer, Pair<RowKey, Long>> data = new HashMap<ByteBuffer, Pair<RowKey, Long>>()
         {{
-                put(Int32Type.instance.decompose(5), Pair.create(keyAt(1L), 1L));
+                put(Int32Type.instance.decompose(5), Pair.create(new RowKey(keyAt(1L), ck(clusteringOffset(1L)), CLUSTERING_COMPARATOR) , 1L));
         }};
 
         OnDiskIndexBuilder builder1 = new OnDiskIndexBuilder(UTF8Type.instance, Int32Type.instance, OnDiskIndexBuilder.Mode.PREFIX);
-        OnDiskIndexBuilder builder2 = new OnDiskIndexBuilder(UTF8Type.instance, Int32Type.instance, OnDiskIndexBuilder.Mode.PREFIX);
-        for (Map.Entry<ByteBuffer, Pair<DecoratedKey, Long>> e : data.entrySet())
+        for (Map.Entry<ByteBuffer, Pair<RowKey, Long>> e : data.entrySet())
         {
-            DecoratedKey key = e.getValue().left;
+            DecoratedKey key = e.getValue().left.decoratedKey;
             Long position = e.getValue().right;
 
-            builder1.add(e.getKey(), key, position);
-            builder2.add(e.getKey(), key, position);
+            builder1.add(e.getKey(), key, position, clusteringOffset(position));
         }
 
         File index1 = File.createTempFile("on-disk-sa-int", "db");
-        File index2 = File.createTempFile("on-disk-sa-int2", "db");
+
         index1.deleteOnExit();
-        index2.deleteOnExit();
 
         builder1.finish(index1);
-        builder2.finish(new Descriptor(Descriptor.VERSION_AA), index2);
+        OnDiskIndex onDisk1 = new OnDiskIndex(index1, Int32Type.instance, KeyConverter.instance);
+        ByteBuffer number = Int32Type.instance.decompose(5);
+        Assert.assertEquals(Collections.singleton(data.get(number).left), convert(onDisk1.search(expressionFor(Operator.EQ, Int32Type.instance, number))));
+        Assert.assertEquals(onDisk1.descriptor.version, Descriptor.CURRENT_VERSION);
+    }
 
-        OnDiskIndex onDisk1 = new OnDiskIndex(index1, Int32Type.instance, new KeyConverter());
-        OnDiskIndex onDisk2 = new OnDiskIndex(index2, Int32Type.instance, new KeyConverter());
 
-        ByteBuffer number = Int32Type.instance.decompose(5);
+    static final String DATA_DIR = "test/data/legacy-sasi/";
 
-        Assert.assertEquals(Collections.singleton(data.get(number).left), convert(onDisk1.search(expressionFor(Operator.EQ, Int32Type.instance, number))));
+    @Test
+    public void testLegacyDescriptor() throws Exception
+    {
+        final Map<ByteBuffer, Pair<RowKey, Long>> data = new HashMap<ByteBuffer, Pair<RowKey, Long>>()
+        {{
+            put(Int32Type.instance.decompose(5), Pair.create(new RowKey(keyAt(1L), ck(KeyOffsets.NO_OFFSET), CLUSTERING_COMPARATOR) , 1L));
+        }};
+
+        File index2 = new File(DATA_DIR + "on-disk-sa-int2.db");
+        OnDiskIndex onDisk2 = new OnDiskIndex(index2, Int32Type.instance, KeyConverter.instance);
+
+        ByteBuffer number = Int32Type.instance.decompose(5);
         Assert.assertEquals(Collections.singleton(data.get(number).left), convert(onDisk2.search(expressionFor(Operator.EQ, Int32Type.instance, number))));
 
-        Assert.assertEquals(onDisk1.descriptor.version.version, Descriptor.CURRENT_VERSION);
-        Assert.assertEquals(onDisk2.descriptor.version.version, Descriptor.VERSION_AA);
+        Assert.assertEquals(onDisk2.descriptor.version, Descriptor.VERSION_AA);
     }
 
     @Test
@@ -574,7 +583,7 @@ public class OnDiskIndexTest
 
         builder.finish(index);
 
-        OnDiskIndex onDisk = new OnDiskIndex(index, Int32Type.instance, new KeyConverter());
+        OnDiskIndex onDisk = new OnDiskIndex(index, Int32Type.instance, KeyConverter.instance);
         OnDiskIndex.OnDiskSuperBlock superBlock = onDisk.dataLevel.getSuperBlock(0);
         Iterator<Token> iter = superBlock.iterator();
 
@@ -595,14 +604,14 @@ public class OnDiskIndexTest
     {
         OnDiskIndexBuilder builder = new OnDiskIndexBuilder(UTF8Type.instance, LongType.instance, OnDiskIndexBuilder.Mode.SPARSE);
         for (long i = 0; i < 100000; i++)
-            builder.add(LongType.instance.decompose(i), keyAt(i), i);
+            builder.add(LongType.instance.decompose(i), keyAt(i), i, clusteringOffset(i));
 
         File index = File.createTempFile("on-disk-sa-multi-superblock-match", ".db");
         index.deleteOnExit();
 
         builder.finish(index);
 
-        OnDiskIndex onDiskIndex = new OnDiskIndex(index, LongType.instance, new KeyConverter());
+        OnDiskIndex onDiskIndex = new OnDiskIndex(index, LongType.instance, KeyConverter.instance);
 
         testSearchRangeWithSuperBlocks(onDiskIndex, 0, 500);
         testSearchRangeWithSuperBlocks(onDiskIndex, 300, 93456);
@@ -617,9 +626,9 @@ public class OnDiskIndexTest
         }
     }
 
-    public void putAll(SortedMap<Long, LongSet> offsets, TokenTreeBuilder ttb)
+    public void putAll(SortedMap<Long, KeyOffsets> offsets, TokenTreeBuilder ttb)
     {
-        for (Pair<Long, LongSet> entry : ttb)
+        for (Pair<Long, KeyOffsets> entry : ttb)
             offsets.put(entry.left, entry.right);
     }
 
@@ -629,26 +638,26 @@ public class OnDiskIndexTest
         OnDiskIndexBuilder builderA = new OnDiskIndexBuilder(UTF8Type.instance, LongType.instance, OnDiskIndexBuilder.Mode.PREFIX);
         OnDiskIndexBuilder builderB = new OnDiskIndexBuilder(UTF8Type.instance, LongType.instance, OnDiskIndexBuilder.Mode.PREFIX);
 
-        TreeMap<Long, TreeMap<Long, LongSet>> expected = new TreeMap<>();
+        TreeMap<Long, TreeMap<Long, KeyOffsets>> expected = new TreeMap<>();
 
         for (long i = 0; i <= 100; i++)
         {
-            TreeMap<Long, LongSet> offsets = expected.get(i);
+            TreeMap<Long, KeyOffsets> offsets = expected.get(i);
             if (offsets == null)
                 expected.put(i, (offsets = new TreeMap<>()));
 
-            builderA.add(LongType.instance.decompose(i), keyAt(i), i);
+            builderA.add(LongType.instance.decompose(i), keyAt(i), i, clusteringOffset(i));
             putAll(offsets, keyBuilder(i));
         }
 
         for (long i = 50; i < 100; i++)
         {
-            TreeMap<Long, LongSet> offsets = expected.get(i);
+            TreeMap<Long, KeyOffsets> offsets = expected.get(i);
             if (offsets == null)
                 expected.put(i, (offsets = new TreeMap<>()));
 
             long position = 100L + i;
-            builderB.add(LongType.instance.decompose(i), keyAt(position), position);
+            builderB.add(LongType.instance.decompose(i), keyAt(position), position, clusteringOffset(position));
             putAll(offsets, keyBuilder(100L + i));
         }
 
@@ -661,19 +670,19 @@ public class OnDiskIndexTest
         builderA.finish(indexA);
         builderB.finish(indexB);
 
-        OnDiskIndex a = new OnDiskIndex(indexA, LongType.instance, new KeyConverter());
-        OnDiskIndex b = new OnDiskIndex(indexB, LongType.instance, new KeyConverter());
+        OnDiskIndex a = new OnDiskIndex(indexA, LongType.instance, KeyConverter.instance);
+        OnDiskIndex b = new OnDiskIndex(indexB, LongType.instance, KeyConverter.instance);
 
         RangeIterator<OnDiskIndex.DataTerm, CombinedTerm> union = OnDiskIndexIterator.union(a, b);
 
-        TreeMap<Long, TreeMap<Long, LongSet>> actual = new TreeMap<>();
+        TreeMap<Long, TreeMap<Long, KeyOffsets>> actual = new TreeMap<>();
         while (union.hasNext())
         {
             CombinedTerm term = union.next();
 
             Long composedTerm = LongType.instance.compose(term.getTerm());
 
-            TreeMap<Long, LongSet> offsets = actual.get(composedTerm);
+            TreeMap<Long, KeyOffsets> offsets = actual.get(composedTerm);
             if (offsets == null)
                 actual.put(composedTerm, (offsets = new TreeMap<>()));
 
@@ -688,7 +697,7 @@ public class OnDiskIndexTest
         OnDiskIndexBuilder combined = new OnDiskIndexBuilder(UTF8Type.instance, LongType.instance, OnDiskIndexBuilder.Mode.PREFIX);
         combined.finish(Pair.create(keyAt(0).getKey(), keyAt(100).getKey()), indexC, new CombinedTermIterator(a, b));
 
-        OnDiskIndex c = new OnDiskIndex(indexC, LongType.instance, new KeyConverter());
+        OnDiskIndex c = new OnDiskIndex(indexC, LongType.instance, KeyConverter.instance);
         union = OnDiskIndexIterator.union(c);
         actual.clear();
 
@@ -698,7 +707,7 @@ public class OnDiskIndexTest
 
             Long composedTerm = LongType.instance.compose(term.getTerm());
 
-            TreeMap<Long, LongSet> offsets = actual.get(composedTerm);
+            TreeMap<Long, KeyOffsets> offsets = actual.get(composedTerm);
             if (offsets == null)
                 actual.put(composedTerm, (offsets = new TreeMap<>()));
 
@@ -738,7 +747,7 @@ public class OnDiskIndexTest
 
         builder.finish(index);
 
-        OnDiskIndex onDisk = new OnDiskIndex(index, UTF8Type.instance, new KeyConverter());
+        OnDiskIndex onDisk = new OnDiskIndex(index, UTF8Type.instance, KeyConverter.instance);
 
         // check that lady% return lady gaga (1) and lady pank (3) but not lady of bells(2)
         Assert.assertEquals(convert(1, 3), convert(onDisk.search(expressionFor("lady", Operator.LIKE_PREFIX))));
@@ -762,7 +771,7 @@ public class OnDiskIndexTest
         while (tokens.hasNext())
         {
             Token token = tokens.next();
-            Iterator<DecoratedKey> keys = token.iterator();
+            Iterator<RowKey> keys = token.iterator();
 
             // each of the values should have exactly a single key
             Assert.assertTrue(keys.hasNext());
@@ -771,7 +780,7 @@ public class OnDiskIndexTest
 
             // and it's last should always smaller than current
             if (lastToken != null)
-                Assert.assertTrue("last should be less than current", lastToken.compareTo(token.get()) < 0);
+                Assert.assertTrue("last should be less than current", lastToken < token.get());
 
             lastToken = token.get();
             keyCount++;
@@ -780,61 +789,84 @@ public class OnDiskIndexTest
         Assert.assertEquals(end - start, keyCount);
     }
 
-    private static DecoratedKey keyAt(long rawKey)
+    private static DecoratedKey keyAt(long partitionOffset)
+    {
+        return KeyConverter.dk(partitionOffset);
+    }
+
+    private static Clustering ck(long rowOffset)
+    {
+        return KeyConverter.ck(rowOffset);
+    }
+
+    private TokenTreeBuilder keyBuilder(long... offsets)
     {
-        ByteBuffer key = ByteBuffer.wrap(("key" + rawKey).getBytes());
-        return new BufferDecoratedKey(new Murmur3Partitioner.LongToken(MurmurHash.hash2_64(key, key.position(), key.remaining(), 0)), key);
+        TokenTreeBuilder builder = new DynamicTokenTreeBuilder();
+
+        for (final long pkOffset : offsets)
+        {
+            DecoratedKey k = keyAt(pkOffset);
+            builder.add((Long) k.getToken().getTokenValue(), pkOffset, clusteringOffset(pkOffset));
+        }
+
+        return builder.finish();
     }
 
-    private static TokenTreeBuilder keyBuilder(Long... keys)
+    private static long clusteringOffset(long offset)
+    {
+        return offset + 100;
+    }
+
+    private TokenTreeBuilder keyBuilder(Pair<Long, Long>... offsets)
     {
         TokenTreeBuilder builder = new DynamicTokenTreeBuilder();
 
-        for (final Long key : keys)
+        for (final Pair<Long,Long> key : offsets)
         {
-            DecoratedKey dk = keyAt(key);
-            builder.add((Long) dk.getToken().getTokenValue(), key);
+            DecoratedKey k = keyAt(key.left);
+            builder.add((Long) k.getToken().getTokenValue(), key.left, key.right);
         }
 
         return builder.finish();
     }
 
-    private static Set<DecoratedKey> convert(TokenTreeBuilder offsets)
+    private static final ClusteringComparator CLUSTERING_COMPARATOR = new ClusteringComparator(BytesType.instance);
+
+    private static Set<RowKey> convert(TokenTreeBuilder offsets)
     {
-        Set<DecoratedKey> result = new HashSet<>();
+        Set<RowKey> result = new HashSet<>();
 
-        Iterator<Pair<Long, LongSet>> offsetIter = offsets.iterator();
+        Iterator<Pair<Long, KeyOffsets>> offsetIter = offsets.iterator();
         while (offsetIter.hasNext())
         {
-            LongSet v = offsetIter.next().right;
+            Pair<Long, KeyOffsets> pair = offsetIter.next();
 
-            for (LongCursor offset : v)
-                result.add(keyAt(offset.value));
+            for (LongObjectCursor<long[]> cursor : pair.right)
+                for (long l : cursor.value)
+                    result.add(new RowKey(keyAt(cursor.key), ck(l), CLUSTERING_COMPARATOR));
         }
         return result;
     }
 
-    private static Set<DecoratedKey> convert(long... keyOffsets)
+    private static Set<RowKey> convert(long... keyOffsets)
     {
-        Set<DecoratedKey> result = new HashSet<>();
-        for (long offset : keyOffsets)
-            result.add(keyAt(offset));
+        Set<RowKey> result = new HashSet<>();
+        for (final long offset : keyOffsets)
+            result.add(new RowKey(keyAt(offset), ck(clusteringOffset(offset)), CLUSTERING_COMPARATOR));
 
         return result;
     }
 
-    private static Set<DecoratedKey> convert(RangeIterator<Long, Token> results)
+    private static Set<RowKey> convert(RangeIterator<Long, Token> results)
     {
         if (results == null)
             return Collections.emptySet();
 
-        Set<DecoratedKey> keys = new TreeSet<>(DecoratedKey.comparator);
+        Set<RowKey> keys = new TreeSet<>();
 
         while (results.hasNext())
-        {
-            for (DecoratedKey key : results.next())
+            for (RowKey key: results.next())
                 keys.add(key);
-        }
 
         return keys;
     }
@@ -908,19 +940,11 @@ public class OnDiskIndexTest
 
     private static void addAll(OnDiskIndexBuilder builder, ByteBuffer term, TokenTreeBuilder tokens)
     {
-        for (Pair<Long, LongSet> token : tokens)
-        {
-            for (long position : token.right.toArray())
-                builder.add(term, keyAt(position), position);
-        }
-    }
-
-    private static class KeyConverter implements Function<Long, DecoratedKey>
-    {
-        @Override
-        public DecoratedKey apply(Long offset)
+        for (Pair<Long, KeyOffsets> token : tokens)
         {
-            return keyAt(offset);
+            for (LongObjectCursor<long[]> cursor : token.right)
+                for (long clusteringOffset : cursor.value)
+                    builder.add(term, keyAt(cursor.key), cursor.key, clusteringOffset);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/test/unit/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriterTest.java b/test/unit/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriterTest.java
index f19d962..61e4d67 100644
--- a/test/unit/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriterTest.java
+++ b/test/unit/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriterTest.java
@@ -22,11 +22,14 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Supplier;
 
+import com.carrotsearch.hppc.cursors.LongObjectCursor;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.ClusteringComparator;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Keyspace;
@@ -35,6 +38,9 @@ import org.apache.cassandra.db.marshal.LongType;
 import org.apache.cassandra.db.rows.BTreeRow;
 import org.apache.cassandra.db.rows.BufferCell;
 import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.index.sasi.KeyFetcher;
 import org.apache.cassandra.index.sasi.SASIIndex;
 import org.apache.cassandra.index.sasi.utils.RangeIterator;
 import org.apache.cassandra.db.marshal.Int32Type;
@@ -70,6 +76,8 @@ public class PerSSTableIndexWriterTest extends SchemaLoader
                                                                      Tables.of(SchemaLoader.sasiCFMD(KS_NAME, CF_NAME))));
     }
 
+    private static final ClusteringComparator CLUSTERING_COMPARATOR = new ClusteringComparator(LongType.instance);
+
     @Test
     public void testPartialIndexWrites() throws Exception
     {
@@ -86,19 +94,20 @@ public class PerSSTableIndexWriterTest extends SchemaLoader
         Descriptor descriptor = Descriptor.fromFilename(cfs.getSSTablePath(directory));
         PerSSTableIndexWriter indexWriter = (PerSSTableIndexWriter) sasi.getFlushObserver(descriptor, OperationType.FLUSH);
 
-        SortedMap<DecoratedKey, Row> expectedKeys = new TreeMap<>(DecoratedKey.comparator);
+        SortedMap<RowKey, Row> expectedKeys = new TreeMap<>();
 
         for (int i = 0; i < maxKeys; i++)
         {
             ByteBuffer key = ByteBufferUtil.bytes(String.format(keyFormat, i));
-            expectedKeys.put(cfs.metadata.partitioner.decorateKey(key),
-                             BTreeRow.singleCellRow(Clustering.EMPTY,
+            Clustering clustering = Clustering.make(ByteBufferUtil.bytes(i * 1L));
+            expectedKeys.put(new RowKey(cfs.metadata.partitioner.decorateKey(key), clustering, CLUSTERING_COMPARATOR),
+                             BTreeRow.singleCellRow(clustering,
                                                     BufferCell.live(column, timestamp, Int32Type.instance.decompose(i))));
         }
 
         indexWriter.begin();
 
-        Iterator<Map.Entry<DecoratedKey, Row>> keyIterator = expectedKeys.entrySet().iterator();
+        Iterator<Map.Entry<RowKey, Row>> keyIterator = expectedKeys.entrySet().iterator();
         long position = 0;
 
         Set<String> segments = new HashSet<>();
@@ -110,10 +119,11 @@ public class PerSSTableIndexWriterTest extends SchemaLoader
                 if (!keyIterator.hasNext())
                     break outer;
 
-                Map.Entry<DecoratedKey, Row> key = keyIterator.next();
+                Map.Entry<RowKey, Row> key = keyIterator.next();
 
-                indexWriter.startPartition(key.getKey(), position++);
-                indexWriter.nextUnfilteredCluster(key.getValue());
+                indexWriter.startPartition(key.getKey().decoratedKey, position);
+                indexWriter.nextUnfilteredCluster(key.getValue(), position);
+                position++;
             }
 
             PerSSTableIndexWriter.Index index = indexWriter.getIndex(column);
@@ -134,15 +144,12 @@ public class PerSSTableIndexWriterTest extends SchemaLoader
         for (String segment : segments)
             Assert.assertFalse(new File(segment).exists());
 
-        OnDiskIndex index = new OnDiskIndex(new File(indexFile), Int32Type.instance, keyPosition -> {
-            ByteBuffer key = ByteBufferUtil.bytes(String.format(keyFormat, keyPosition));
-            return cfs.metadata.partitioner.decorateKey(key);
-        });
+        OnDiskIndex index = new OnDiskIndex(new File(indexFile), Int32Type.instance, new FakeKeyFetcher(cfs, keyFormat));
 
         Assert.assertEquals(0, UTF8Type.instance.compare(index.minKey(), ByteBufferUtil.bytes(String.format(keyFormat, 0))));
         Assert.assertEquals(0, UTF8Type.instance.compare(index.maxKey(), ByteBufferUtil.bytes(String.format(keyFormat, maxKeys - 1))));
 
-        Set<DecoratedKey> actualKeys = new HashSet<>();
+        Set<RowKey> actualKeys = new HashSet<>();
         int count = 0;
         for (OnDiskIndex.DataTerm term : index)
         {
@@ -150,7 +157,7 @@ public class PerSSTableIndexWriterTest extends SchemaLoader
 
             while (tokens.hasNext())
             {
-                for (DecoratedKey key : tokens.next())
+                for (RowKey key : tokens.next())
                     actualKeys.add(key);
             }
 
@@ -158,8 +165,8 @@ public class PerSSTableIndexWriterTest extends SchemaLoader
         }
 
         Assert.assertEquals(expectedKeys.size(), actualKeys.size());
-        for (DecoratedKey key : expectedKeys.keySet())
-            Assert.assertTrue(actualKeys.contains(key));
+        for (RowKey key : expectedKeys.keySet())
+            Assert.assertTrue("Key was not present : " + key, actualKeys.contains(key));
 
         FileUtils.closeQuietly(index);
     }
@@ -183,11 +190,14 @@ public class PerSSTableIndexWriterTest extends SchemaLoader
         indexWriter.begin();
         indexWriter.indexes.put(column, indexWriter.newIndex(sasi.getIndex()));
 
-        populateSegment(cfs.metadata, indexWriter.getIndex(column), new HashMap<Long, Set<Integer>>()
+        populateSegment(cfs.metadata, indexWriter.getIndex(column), new HashMap<Long, KeyOffsets>()
         {{
-            put(now,     new HashSet<>(Arrays.asList(0, 1)));
-            put(now + 1, new HashSet<>(Arrays.asList(2, 3)));
-            put(now + 2, new HashSet<>(Arrays.asList(4, 5, 6, 7, 8, 9)));
+            put(now,     new KeyOffsets() {{ put(0, 0); put(1, 1); }});
+            put(now + 1, new KeyOffsets() {{ put(2, 2); put(3, 3); }});
+            put(now + 2, new KeyOffsets() {{
+                put(4, 4); put(5, 5); put(6, 6);
+                put(7, 7); put(8, 8); put(9, 9);
+            }});
         }});
 
         Callable<OnDiskIndex> segmentBuilder = indexWriter.getIndex(column).scheduleSegmentFlush(false);
@@ -197,15 +207,21 @@ public class PerSSTableIndexWriterTest extends SchemaLoader
         PerSSTableIndexWriter.Index index = indexWriter.getIndex(column);
         Random random = ThreadLocalRandom.current();
 
+        Supplier<KeyOffsets> offsetSupplier = () -> new KeyOffsets() {{
+            put(random.nextInt(), random.nextInt());
+            put(random.nextInt(), random.nextInt());
+            put(random.nextInt(), random.nextInt());
+        }};
+
         Set<String> segments = new HashSet<>();
         // now let's test multiple correct segments with yield incorrect final segment
         for (int i = 0; i < 3; i++)
         {
-            populateSegment(cfs.metadata, index, new HashMap<Long, Set<Integer>>()
+            populateSegment(cfs.metadata, index, new HashMap<Long, KeyOffsets>()
             {{
-                put(now,     new HashSet<>(Arrays.asList(random.nextInt(), random.nextInt(), random.nextInt())));
-                put(now + 1, new HashSet<>(Arrays.asList(random.nextInt(), random.nextInt(), random.nextInt())));
-                put(now + 2, new HashSet<>(Arrays.asList(random.nextInt(), random.nextInt(), random.nextInt())));
+                put(now,     offsetSupplier.get());
+                put(now + 1, offsetSupplier.get());
+                put(now + 2, offsetSupplier.get());
             }});
 
             try
@@ -236,16 +252,56 @@ public class PerSSTableIndexWriterTest extends SchemaLoader
         Assert.assertFalse(new File(index.outputFile).exists());
     }
 
-    private static void populateSegment(CFMetaData metadata, PerSSTableIndexWriter.Index index, Map<Long, Set<Integer>> data)
+    private static void populateSegment(CFMetaData metadata, PerSSTableIndexWriter.Index index, Map<Long, KeyOffsets> data)
     {
-        for (Map.Entry<Long, Set<Integer>> value : data.entrySet())
+        for (Map.Entry<Long, KeyOffsets> value : data.entrySet())
         {
             ByteBuffer term = LongType.instance.decompose(value.getKey());
-            for (Integer keyPos : value.getValue())
+            for (LongObjectCursor<long[]> cursor : value.getValue())
             {
-                ByteBuffer key = ByteBufferUtil.bytes(String.format("key%06d", keyPos));
-                index.add(term, metadata.partitioner.decorateKey(key), ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE - 1));
+                ByteBuffer key = ByteBufferUtil.bytes(String.format("key%06d", cursor.key));
+                for (long rowOffset : cursor.value)
+                {
+                    index.add(term,
+                              metadata.partitioner.decorateKey(key),
+                              ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE - 1),
+                              ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE - 1));
+                }
             }
         }
     }
+
+    private final class FakeKeyFetcher implements KeyFetcher
+    {
+        private final ColumnFamilyStore cfs;
+        private final String keyFormat;
+
+        public FakeKeyFetcher(ColumnFamilyStore cfs, String keyFormat)
+        {
+            this.cfs = cfs;
+            this.keyFormat = keyFormat;
+        }
+
+        public DecoratedKey getPartitionKey(long keyPosition)
+        {
+            ByteBuffer key = ByteBufferUtil.bytes(String.format(keyFormat, keyPosition));
+            return cfs.metadata.partitioner.decorateKey(key);
+        }
+
+        public Clustering getClustering(long offset)
+        {
+            return Clustering.make(ByteBufferUtil.bytes(offset));
+        }
+
+        public RowKey getRowKey(long partitionOffset, long rowOffset)
+        {
+            return new RowKey(getPartitionKey(partitionOffset), getClustering(rowOffset), CLUSTERING_COMPARATOR);
+        }
+    }
+
+    public IPartitioner getPartitioner()
+    {
+        return Murmur3Partitioner.instance;
+    }
+
 }