You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by xe...@apache.org on 2016/09/06 05:19:17 UTC
[1/3] cassandra git commit: Add row offset support to SASI
Repository: cassandra
Updated Branches:
refs/heads/trunk 3c95d4731 -> 7d857b46f
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/test/unit/org/apache/cassandra/index/sasi/disk/TokenTreeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/sasi/disk/TokenTreeTest.java b/test/unit/org/apache/cassandra/index/sasi/disk/TokenTreeTest.java
index 927e165..7c2498c 100644
--- a/test/unit/org/apache/cassandra/index/sasi/disk/TokenTreeTest.java
+++ b/test/unit/org/apache/cassandra/index/sasi/disk/TokenTreeTest.java
@@ -19,42 +19,31 @@ package org.apache.cassandra.index.sasi.disk;
import java.io.File;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.*;
import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
+import com.carrotsearch.hppc.cursors.LongObjectCursor;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.BufferDecoratedKey;
+import org.apache.cassandra.db.ClusteringComparator;
import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.marshal.LongType;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.index.sasi.disk.TokenTreeBuilder.EntryType;
-import org.apache.cassandra.index.sasi.utils.CombinedTerm;
-import org.apache.cassandra.index.sasi.utils.CombinedValue;
-import org.apache.cassandra.index.sasi.utils.MappedBuffer;
-import org.apache.cassandra.index.sasi.utils.RangeIterator;
-import org.apache.cassandra.db.marshal.LongType;
-import org.apache.cassandra.index.sasi.utils.RangeUnionIterator;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.io.util.SequentialWriterOption;
-import org.apache.cassandra.utils.MurmurHash;
-import org.apache.cassandra.io.util.RandomAccessReader;
-import org.apache.cassandra.io.util.SequentialWriter;
+import org.apache.cassandra.index.sasi.utils.*;
+import org.apache.cassandra.io.util.*;
import junit.framework.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.commons.lang3.builder.HashCodeBuilder;
-import com.carrotsearch.hppc.LongOpenHashSet;
-import com.carrotsearch.hppc.LongSet;
-import com.carrotsearch.hppc.cursors.LongCursor;
-import com.google.common.base.Function;
public class TokenTreeTest
{
- private static final Function<Long, DecoratedKey> KEY_CONVERTER = new KeyConverter();
+ private static final ClusteringComparator CLUSTERING_COMPARATOR = new ClusteringComparator(LongType.instance);
@BeforeClass
public static void setupDD()
@@ -62,14 +51,30 @@ public class TokenTreeTest
DatabaseDescriptor.daemonInitialization();
}
- static LongSet singleOffset = new LongOpenHashSet() {{ add(1); }};
- static LongSet bigSingleOffset = new LongOpenHashSet() {{ add(2147521562L); }};
- static LongSet shortPackableCollision = new LongOpenHashSet() {{ add(2L); add(3L); }}; // can pack two shorts
- static LongSet intPackableCollision = new LongOpenHashSet() {{ add(6L); add(((long) Short.MAX_VALUE) + 1); }}; // can pack int & short
- static LongSet multiCollision = new LongOpenHashSet() {{ add(3L); add(4L); add(5L); }}; // can't pack
- static LongSet unpackableCollision = new LongOpenHashSet() {{ add(((long) Short.MAX_VALUE) + 1); add(((long) Short.MAX_VALUE) + 2); }}; // can't pack
+ static KeyOffsets singleOffset = new KeyOffsets() {{ put(1L, KeyOffsets.asArray(10L)); }};
+ static KeyOffsets bigSingleOffset = new KeyOffsets() {{ put(2147521562L, KeyOffsets.asArray(10)); }};
+ static KeyOffsets shortPackableCollision = new KeyOffsets() {{
+ put(2L, KeyOffsets.asArray(10));
+ put(3L, KeyOffsets.asArray(10));
+ }}; // can pack two shorts
+ static KeyOffsets intPackableCollision = new KeyOffsets()
+ {{
+ put(6L, KeyOffsets.asArray(10));
+ put(((long) Short.MAX_VALUE) + 1, KeyOffsets.asArray(10));
+ }}; // can pack int & short
+ static KeyOffsets multiCollision = new KeyOffsets()
+ {{
+ put(3L, KeyOffsets.asArray(10));
+ put(4L, KeyOffsets.asArray(10));
+ put(5L, KeyOffsets.asArray(10));
+ }}; // can't pack
+ static KeyOffsets unpackableCollision = new KeyOffsets()
+ {{
+ put(((long) Short.MAX_VALUE) + 1, KeyOffsets.asArray(10));
+ put(((long) Short.MAX_VALUE) + 2, KeyOffsets.asArray(10));
+ }}; // can't pack
- final static SortedMap<Long, LongSet> simpleTokenMap = new TreeMap<Long, LongSet>()
+ final static SortedMap<Long, KeyOffsets> simpleTokenMap = new TreeMap<Long, KeyOffsets>()
{{
put(1L, bigSingleOffset); put(3L, shortPackableCollision); put(4L, intPackableCollision); put(6L, singleOffset);
put(9L, multiCollision); put(10L, unpackableCollision); put(12L, singleOffset); put(13L, singleOffset);
@@ -81,18 +86,20 @@ public class TokenTreeTest
put(121L, singleOffset); put(122L, singleOffset); put(123L, singleOffset); put(125L, singleOffset);
}};
- final static SortedMap<Long, LongSet> bigTokensMap = new TreeMap<Long, LongSet>()
+ final static SortedMap<Long, KeyOffsets> bigTokensMap = new TreeMap<Long, KeyOffsets>()
{{
for (long i = 0; i < 1000000; i++)
put(i, singleOffset);
}};
- final static SortedMap<Long, LongSet> collidingTokensMap = new TreeMap<Long, LongSet>()
+ final static SortedMap<Long, KeyOffsets> collidingTokensMap = new TreeMap<Long, KeyOffsets>()
{{
- put(1L, singleOffset); put(7L, singleOffset); put(8L, singleOffset);
+ put(1L, singleOffset);
+ put(7L, singleOffset);
+ put(8L, singleOffset);
}};
- final static SortedMap<Long, LongSet> tokens = bigTokensMap;
+ final static SortedMap<Long, KeyOffsets> tokens = bigTokensMap;
final static SequentialWriterOption DEFAULT_OPT = SequentialWriterOption.newBuilder().bufferSize(4096).build();
@@ -139,7 +146,7 @@ public class TokenTreeTest
}
- public void buildSerializeAndIterate(TokenTreeBuilder builder, SortedMap<Long, LongSet> tokenMap) throws Exception
+ public void buildSerializeAndIterate(TokenTreeBuilder builder, SortedMap<Long, KeyOffsets> tokenMap) throws Exception
{
builder.finish();
@@ -155,12 +162,12 @@ public class TokenTreeTest
final RandomAccessReader reader = RandomAccessReader.open(treeFile);
final TokenTree tokenTree = new TokenTree(new MappedBuffer(reader));
- final Iterator<Token> tokenIterator = tokenTree.iterator(KEY_CONVERTER);
- final Iterator<Map.Entry<Long, LongSet>> listIterator = tokenMap.entrySet().iterator();
+ final Iterator<Token> tokenIterator = tokenTree.iterator(KeyConverter.instance);
+ final Iterator<Map.Entry<Long, KeyOffsets>> listIterator = tokenMap.entrySet().iterator();
while (tokenIterator.hasNext() && listIterator.hasNext())
{
Token treeNext = tokenIterator.next();
- Map.Entry<Long, LongSet> listNext = listIterator.next();
+ Map.Entry<Long, KeyOffsets> listNext = listIterator.next();
Assert.assertEquals(listNext.getKey(), treeNext.get());
Assert.assertEquals(convert(listNext.getValue()), convert(treeNext));
@@ -193,15 +200,15 @@ public class TokenTreeTest
for (long i = 0; i <= tokMax; i++)
{
- TokenTree.OnDiskToken result = tokenTree.get(i, KEY_CONVERTER);
+ TokenTree.OnDiskToken result = tokenTree.get(i, KeyConverter.instance);
Assert.assertNotNull("failed to find object for token " + i, result);
- LongSet found = result.getOffsets();
+ KeyOffsets found = result.getOffsets();
Assert.assertEquals(1, found.size());
- Assert.assertEquals(i, found.toArray()[0]);
+ Assert.assertEquals(i, found.iterator().next().key);
}
- Assert.assertNull("found missing object", tokenTree.get(tokMax + 10, KEY_CONVERTER));
+ Assert.assertNull("found missing object", tokenTree.get(tokMax + 10, KeyConverter.instance));
}
@Test
@@ -216,7 +223,7 @@ public class TokenTreeTest
buildSerializeIterateAndSkip(new StaticTokenTreeBuilder(new FakeCombinedTerm(tokens)), tokens);
}
- public void buildSerializeIterateAndSkip(TokenTreeBuilder builder, SortedMap<Long, LongSet> tokens) throws Exception
+ public void buildSerializeIterateAndSkip(TokenTreeBuilder builder, SortedMap<Long, KeyOffsets> tokens) throws Exception
{
builder.finish();
final File treeFile = File.createTempFile("token-tree-iterate-test2", "tt");
@@ -231,7 +238,7 @@ public class TokenTreeTest
final RandomAccessReader reader = RandomAccessReader.open(treeFile);
final TokenTree tokenTree = new TokenTree(new MappedBuffer(reader));
- final RangeIterator<Long, Token> treeIterator = tokenTree.iterator(KEY_CONVERTER);
+ final RangeIterator<Long, Token> treeIterator = tokenTree.iterator(KeyConverter.instance);
final RangeIterator<Long, TokenWithOffsets> listIterator = new EntrySetSkippableIterator(tokens);
long lastToken = 0L;
@@ -275,7 +282,7 @@ public class TokenTreeTest
skipPastEnd(new StaticTokenTreeBuilder(new FakeCombinedTerm(simpleTokenMap)), simpleTokenMap);
}
- public void skipPastEnd(TokenTreeBuilder builder, SortedMap<Long, LongSet> tokens) throws Exception
+ public void skipPastEnd(TokenTreeBuilder builder, SortedMap<Long, KeyOffsets> tokens) throws Exception
{
builder.finish();
final File treeFile = File.createTempFile("token-tree-skip-past-test", "tt");
@@ -288,7 +295,7 @@ public class TokenTreeTest
}
final RandomAccessReader reader = RandomAccessReader.open(treeFile);
- final RangeIterator<Long, Token> tokenTree = new TokenTree(new MappedBuffer(reader)).iterator(KEY_CONVERTER);
+ final RangeIterator<Long, Token> tokenTree = new TokenTree(new MappedBuffer(reader)).iterator(KeyConverter.instance);
tokenTree.skipTo(tokens.lastKey() + 10);
}
@@ -313,8 +320,8 @@ public class TokenTreeTest
TokenTree treeA = generateTree(min, max, isStatic);
TokenTree treeB = generateTree(min, max, isStatic);
- RangeIterator<Long, Token> a = treeA.iterator(new KeyConverter());
- RangeIterator<Long, Token> b = treeB.iterator(new KeyConverter());
+ RangeIterator<Long, Token> a = treeA.iterator(KeyConverter.instance);
+ RangeIterator<Long, Token> b = treeB.iterator(KeyConverter.instance);
long count = min;
while (a.hasNext() && b.hasNext())
@@ -332,7 +339,8 @@ public class TokenTreeTest
// should fail when trying to merge different tokens
try
{
- tokenA.merge(new TokenWithOffsets(tokenA.get() + 1, convert(count)));
+ long l = tokenA.get();
+ tokenA.merge(new TokenWithOffsets(l + 1, convert(count)));
Assert.fail();
}
catch (IllegalArgumentException e)
@@ -341,8 +349,8 @@ public class TokenTreeTest
}
final Set<Long> offsets = new TreeSet<>();
- for (DecoratedKey key : tokenA)
- offsets.add(LongType.instance.compose(key.getKey()));
+ for (RowKey key : tokenA)
+ offsets.add(LongType.instance.compose(key.decoratedKey.getKey()));
Set<Long> expected = new TreeSet<>();
{
@@ -373,7 +381,7 @@ public class TokenTreeTest
testMergingOfEqualTokenTrees(bigTokensMap);
}
- public void testMergingOfEqualTokenTrees(SortedMap<Long, LongSet> tokensMap) throws Exception
+ public void testMergingOfEqualTokenTrees(SortedMap<Long, KeyOffsets> tokensMap) throws Exception
{
TokenTreeBuilder tokensA = new DynamicTokenTreeBuilder(tokensMap);
TokenTreeBuilder tokensB = new DynamicTokenTreeBuilder(tokensMap);
@@ -386,8 +394,8 @@ public class TokenTreeTest
public RangeIterator<Long, Token> getTokenIterator()
{
RangeIterator.Builder<Long, Token> union = RangeUnionIterator.builder();
- union.add(a.iterator(new KeyConverter()));
- union.add(b.iterator(new KeyConverter()));
+ union.add(a.iterator(KeyConverter.instance));
+ union.add(b.iterator(KeyConverter.instance));
return union.build();
}
@@ -395,31 +403,30 @@ public class TokenTreeTest
TokenTree c = buildTree(tokensC);
Assert.assertEquals(tokensMap.size(), c.getCount());
+ Iterator<Token> tokenIterator = c.iterator(KeyConverter.instance);
+ Iterator<Map.Entry<Long, KeyOffsets>> listIterator = tokensMap.entrySet().iterator();
- Iterator<Token> tokenIterator = c.iterator(KEY_CONVERTER);
- Iterator<Map.Entry<Long, LongSet>> listIterator = tokensMap.entrySet().iterator();
while (tokenIterator.hasNext() && listIterator.hasNext())
{
Token treeNext = tokenIterator.next();
- Map.Entry<Long, LongSet> listNext = listIterator.next();
+ Map.Entry<Long, KeyOffsets> listNext = listIterator.next();
Assert.assertEquals(listNext.getKey(), treeNext.get());
Assert.assertEquals(convert(listNext.getValue()), convert(treeNext));
}
- for (Map.Entry<Long, LongSet> entry : tokensMap.entrySet())
+ for (Map.Entry<Long, KeyOffsets> entry : tokensMap.entrySet())
{
- TokenTree.OnDiskToken result = c.get(entry.getKey(), KEY_CONVERTER);
+ TokenTree.OnDiskToken result = c.get(entry.getKey(), KeyConverter.instance);
Assert.assertNotNull("failed to find object for token " + entry.getKey(), result);
-
- LongSet found = result.getOffsets();
+ KeyOffsets found = result.getOffsets();
Assert.assertEquals(entry.getValue(), found);
}
}
- private static TokenTree buildTree(TokenTreeBuilder builder) throws Exception
+ private TokenTree buildTree(TokenTreeBuilder builder) throws Exception
{
builder.finish();
final File treeFile = File.createTempFile("token-tree-", "db");
@@ -437,9 +444,9 @@ public class TokenTreeTest
private static class EntrySetSkippableIterator extends RangeIterator<Long, TokenWithOffsets>
{
- private final PeekingIterator<Map.Entry<Long, LongSet>> elements;
+ private final PeekingIterator<Map.Entry<Long, KeyOffsets>> elements;
- EntrySetSkippableIterator(SortedMap<Long, LongSet> elms)
+ EntrySetSkippableIterator(SortedMap<Long, KeyOffsets> elms)
{
super(elms.firstKey(), elms.lastKey(), elms.size());
elements = Iterators.peekingIterator(elms.entrySet().iterator());
@@ -451,7 +458,7 @@ public class TokenTreeTest
if (!elements.hasNext())
return endOfData();
- Map.Entry<Long, LongSet> next = elements.next();
+ Map.Entry<Long, KeyOffsets> next = elements.next();
return new TokenWithOffsets(next.getKey(), next.getValue());
}
@@ -478,9 +485,9 @@ public class TokenTreeTest
public static class FakeCombinedTerm extends CombinedTerm
{
- private final SortedMap<Long, LongSet> tokens;
+ private final SortedMap<Long, KeyOffsets> tokens;
- public FakeCombinedTerm(SortedMap<Long, LongSet> tokens)
+ public FakeCombinedTerm(SortedMap<Long, KeyOffsets> tokens)
{
super(null, null);
this.tokens = tokens;
@@ -494,9 +501,9 @@ public class TokenTreeTest
public static class TokenMapIterator extends RangeIterator<Long, Token>
{
- public final Iterator<Map.Entry<Long, LongSet>> iterator;
+ public final Iterator<Map.Entry<Long, KeyOffsets>> iterator;
- public TokenMapIterator(SortedMap<Long, LongSet> tokens)
+ public TokenMapIterator(SortedMap<Long, KeyOffsets> tokens)
{
super(tokens.firstKey(), tokens.lastKey(), tokens.size());
iterator = tokens.entrySet().iterator();
@@ -507,7 +514,7 @@ public class TokenTreeTest
if (!iterator.hasNext())
return endOfData();
- Map.Entry<Long, LongSet> entry = iterator.next();
+ Map.Entry<Long, KeyOffsets> entry = iterator.next();
return new TokenWithOffsets(entry.getKey(), entry.getValue());
}
@@ -524,16 +531,16 @@ public class TokenTreeTest
public static class TokenWithOffsets extends Token
{
- private final LongSet offsets;
+ private final KeyOffsets offsets;
- public TokenWithOffsets(long token, final LongSet offsets)
+ public TokenWithOffsets(Long token, final KeyOffsets offsets)
{
super(token);
this.offsets = offsets;
}
@Override
- public LongSet getOffsets()
+ public KeyOffsets getOffsets()
{
return offsets;
}
@@ -571,71 +578,56 @@ public class TokenTreeTest
}
@Override
- public Iterator<DecoratedKey> iterator()
+ public Iterator<RowKey> iterator()
{
- List<DecoratedKey> keys = new ArrayList<>(offsets.size());
- for (LongCursor offset : offsets)
- keys.add(dk(offset.value));
-
+ List<RowKey> keys = new ArrayList<>(offsets.size());
+ for (LongObjectCursor<long[]> offset : offsets)
+ for (long l : offset.value)
+ keys.add(KeyConverter.instance.getRowKey(offset.key, l));
return keys.iterator();
}
}
- private static Set<DecoratedKey> convert(LongSet offsets)
+ private static Set<RowKey> convert(KeyOffsets offsets)
{
- Set<DecoratedKey> keys = new HashSet<>();
- for (LongCursor offset : offsets)
- keys.add(KEY_CONVERTER.apply(offset.value));
+ Set<RowKey> keys = new HashSet<>();
+ for (LongObjectCursor<long[]> offset : offsets)
+ for (long l : offset.value)
+ keys.add(new RowKey(KeyConverter.dk(offset.key),
+ KeyConverter.ck(l),
+ CLUSTERING_COMPARATOR));
return keys;
}
- private static Set<DecoratedKey> convert(Token results)
+ private static Set<RowKey> convert(Token results)
{
- Set<DecoratedKey> keys = new HashSet<>();
- for (DecoratedKey key : results)
+ Set<RowKey> keys = new HashSet<>();
+ for (RowKey key : results)
keys.add(key);
return keys;
}
- private static LongSet convert(long... values)
+ private static KeyOffsets convert(long... values)
{
- LongSet result = new LongOpenHashSet(values.length);
+ KeyOffsets result = new KeyOffsets(values.length);
for (long v : values)
- result.add(v);
+ result.put(v, KeyOffsets.asArray(v + 5));
return result;
}
- private static class KeyConverter implements Function<Long, DecoratedKey>
- {
- @Override
- public DecoratedKey apply(Long offset)
- {
- return dk(offset);
- }
- }
-
- private static DecoratedKey dk(Long token)
- {
- ByteBuffer buf = ByteBuffer.allocate(8);
- buf.putLong(token);
- buf.flip();
- Long hashed = MurmurHash.hash2_64(buf, buf.position(), buf.remaining(), 0);
- return new BufferDecoratedKey(new Murmur3Partitioner.LongToken(hashed), buf);
- }
-
- private static TokenTree generateTree(final long minToken, final long maxToken, boolean isStatic) throws IOException
+ private TokenTree generateTree(final long minToken, final long maxToken, boolean isStatic) throws IOException
{
- final SortedMap<Long, LongSet> toks = new TreeMap<Long, LongSet>()
+ final SortedMap<Long, KeyOffsets> toks = new TreeMap<Long, KeyOffsets>()
{{
- for (long i = minToken; i <= maxToken; i++)
- {
- LongSet offsetSet = new LongOpenHashSet();
- offsetSet.add(i);
- put(i, offsetSet);
- }
+ for (long i = minToken; i <= maxToken; i++)
+ {
+ KeyOffsets offsetSet = new KeyOffsets();
+ offsetSet.put(i, KeyOffsets.asArray(i + 5));
+ put(i, offsetSet);
+ }
}};
final TokenTreeBuilder builder = isStatic ? new StaticTokenTreeBuilder(new FakeCombinedTerm(toks)) : new DynamicTokenTreeBuilder(toks);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/test/unit/org/apache/cassandra/index/sasi/plan/OperationTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/sasi/plan/OperationTest.java b/test/unit/org/apache/cassandra/index/sasi/plan/OperationTest.java
index e388cd4..f89dd6c 100644
--- a/test/unit/org/apache/cassandra/index/sasi/plan/OperationTest.java
+++ b/test/unit/org/apache/cassandra/index/sasi/plan/OperationTest.java
@@ -47,7 +47,7 @@ import org.junit.*;
public class OperationTest extends SchemaLoader
{
- private static final String KS_NAME = "sasi";
+ private static final String KS_NAME = "operation_test";
private static final String CF_NAME = "test_cf";
private static final String CLUSTERING_CF_NAME = "clustering_test_cf";
private static final String STATIC_CF_NAME = "static_sasi_test_cf";
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/test/unit/org/apache/cassandra/index/sasi/utils/KeyConverter.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/sasi/utils/KeyConverter.java b/test/unit/org/apache/cassandra/index/sasi/utils/KeyConverter.java
new file mode 100644
index 0000000..7de502a
--- /dev/null
+++ b/test/unit/org/apache/cassandra/index/sasi/utils/KeyConverter.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.index.sasi.utils;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.dht.*;
+import org.apache.cassandra.index.sasi.*;
+import org.apache.cassandra.index.sasi.disk.*;
+import org.apache.cassandra.utils.*;
+
+public class KeyConverter implements KeyFetcher
+{
+ public final static KeyConverter instance = new KeyConverter();
+
+ KeyConverter()
+ {}
+
+ @Override
+ public DecoratedKey getPartitionKey(long offset)
+ {
+ return dk(offset);
+ }
+
+ @Override
+ public Clustering getClustering(long offset)
+ {
+ return ck(offset);
+ }
+
+ @Override
+
+ public RowKey getRowKey(long partitionOffset, long rowOffset)
+ {
+ return new RowKey(getPartitionKey(partitionOffset), getClustering(rowOffset), new ClusteringComparator(LongType.instance));
+ }
+
+ public static DecoratedKey dk(long partitionOffset)
+ {
+ ByteBuffer buf = ByteBuffer.allocate(8);
+ buf.putLong(partitionOffset);
+ buf.flip();
+ Long hashed = MurmurHash.hash2_64(buf, buf.position(), buf.remaining(), 0);
+ return new BufferDecoratedKey(new Murmur3Partitioner.LongToken(hashed), buf);
+ }
+
+ public static Clustering ck(long offset)
+ {
+ return Clustering.make(ByteBufferUtil.bytes(offset));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/test/unit/org/apache/cassandra/index/sasi/utils/LongIterator.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/sasi/utils/LongIterator.java b/test/unit/org/apache/cassandra/index/sasi/utils/LongIterator.java
index 205d28f..4f80a1c 100644
--- a/test/unit/org/apache/cassandra/index/sasi/utils/LongIterator.java
+++ b/test/unit/org/apache/cassandra/index/sasi/utils/LongIterator.java
@@ -26,6 +26,8 @@ import java.util.List;
import com.carrotsearch.hppc.LongOpenHashSet;
import com.carrotsearch.hppc.LongSet;
import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.index.sasi.disk.KeyOffsets;
+import org.apache.cassandra.index.sasi.disk.RowKey;
import org.apache.cassandra.index.sasi.disk.Token;
public class LongIterator extends RangeIterator<Long, Token>
@@ -82,13 +84,13 @@ public class LongIterator extends RangeIterator<Long, Token>
}
@Override
- public LongSet getOffsets()
+ public KeyOffsets getOffsets()
{
- return new LongOpenHashSet(4);
+ return new KeyOffsets(4);
}
@Override
- public Iterator<DecoratedKey> iterator()
+ public Iterator<RowKey> iterator()
{
return Collections.emptyIterator();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/test/unit/org/apache/cassandra/index/sasi/utils/RangeUnionIteratorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/sasi/utils/RangeUnionIteratorTest.java b/test/unit/org/apache/cassandra/index/sasi/utils/RangeUnionIteratorTest.java
index f69086b..4819c0d 100644
--- a/test/unit/org/apache/cassandra/index/sasi/utils/RangeUnionIteratorTest.java
+++ b/test/unit/org/apache/cassandra/index/sasi/utils/RangeUnionIteratorTest.java
@@ -31,6 +31,23 @@ import static org.apache.cassandra.index.sasi.utils.LongIterator.convert;
public class RangeUnionIteratorTest
{
@Test
+ public void mergingOfEqualTokensTest()
+ {
+ RangeUnionIterator.Builder<Long, Token> builder = RangeUnionIterator.builder();
+
+ int size = 1000000;
+ final long[] arr = new long[size];
+ for (int i = 0; i < size; i++)
+ arr[i] = i;
+
+ builder.add(new LongIterator(arr));
+ builder.add(new LongIterator(arr));
+
+ Assert.assertEquals(convert(arr), convert(builder.build()));
+ }
+
+
+ @Test
public void testNoOverlappingValues()
{
RangeUnionIterator.Builder<Long, Token> builder = RangeUnionIterator.builder();
[2/3] cassandra git commit: Add row offset support to SASI
Posted by xe...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/disk/TokenTreeBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/TokenTreeBuilder.java b/src/java/org/apache/cassandra/index/sasi/disk/TokenTreeBuilder.java
index 2210964..07804d6 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/TokenTreeBuilder.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/TokenTreeBuilder.java
@@ -20,28 +20,62 @@ package org.apache.cassandra.index.sasi.disk;
import java.io.IOException;
import java.util.*;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.io.util.*;
+import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.obs.BitUtil;
-import com.carrotsearch.hppc.LongSet;
-
-public interface TokenTreeBuilder extends Iterable<Pair<Long, LongSet>>
+public interface TokenTreeBuilder extends Iterable<Pair<Long, KeyOffsets>>
{
- int BLOCK_BYTES = 4096;
- int BLOCK_HEADER_BYTES = 64;
- int OVERFLOW_TRAILER_BYTES = 64;
- int OVERFLOW_TRAILER_CAPACITY = OVERFLOW_TRAILER_BYTES / 8;
- int TOKENS_PER_BLOCK = (BLOCK_BYTES - BLOCK_HEADER_BYTES - OVERFLOW_TRAILER_BYTES) / 16;
- long MAX_OFFSET = (1L << 47) - 1; // 48 bits for (signed) offset
- byte LAST_LEAF_SHIFT = 1;
- byte SHARED_HEADER_BYTES = 19;
- byte ENTRY_TYPE_MASK = 0x03;
- short AB_MAGIC = 0x5A51;
+ final static int BLOCK_BYTES = 4096;
+
+ final static int LEAF_ENTRY_TYPE_BYTES = Short.BYTES;
+ final static int TOKEN_OFFSET_BYTES = LEAF_ENTRY_TYPE_BYTES;
+ final static int LEAF_PARTITON_OFFSET_BYTES = Long.BYTES;
+ final static int LEAF_ROW_OFFSET_BYTES = Long.BYTES;
+
+ final static int LEAF_PARTITON_OFFSET_PACKED_BYTES = Integer.BYTES;
+ final static int LEAF_ROW_OFFSET_PACKED_BYTES = Integer.BYTES;
+ final static int COLLISION_ENTRY_BYTES = LEAF_PARTITON_OFFSET_BYTES + LEAF_ROW_OFFSET_BYTES;
+
+ final static int HEADER_INFO_BYTE_BYTES = Byte.BYTES;
+ final static int HEADER_TOKEN_COUNT_BYTES = Short.BYTES;
+
+ final static int ROOT_HEADER_MAGIC_SIZE = Short.BYTES;
+ final static int ROOT_HEADER_TOKEN_COUNT_SIZE = Long.BYTES;
+
+ // Partitioner token size in bytes
+ final static int TOKEN_BYTES = Long.BYTES;
+
+ // Leaf entry size in bytes, see {@class SimpleLeafEntry} for a full description
+ final static int LEAF_ENTRY_BYTES = LEAF_ENTRY_TYPE_BYTES + TOKEN_BYTES + LEAF_PARTITON_OFFSET_BYTES + LEAF_ROW_OFFSET_BYTES;
+ // Shared header size in bytes, see {@class AbstractTreeBuilder$Header} for a full description
+ final static int SHARED_HEADER_BYTES = HEADER_INFO_BYTE_BYTES + HEADER_TOKEN_COUNT_BYTES + 2 * TOKEN_BYTES;
+ // Block header size in bytes, see {@class AbstractTreeBuilder$RootHeader}
+ final static int BLOCK_HEADER_BYTES = BitUtil.nextHighestPowerOfTwo(SHARED_HEADER_BYTES + ROOT_HEADER_MAGIC_SIZE + ROOT_HEADER_TOKEN_COUNT_SIZE + 2 * TOKEN_BYTES);
+
+ // Overflow trailer capacity is currently 8 overflow items. Each overflow item consists of two longs.
+ final static int OVERFLOW_TRAILER_CAPACITY = 8;
+ final static int OVERFLOW_TRAILER_BYTES = OVERFLOW_TRAILER_CAPACITY * COLLISION_ENTRY_BYTES;;
+ final static int TOKENS_PER_BLOCK = (TokenTreeBuilder.BLOCK_BYTES - BLOCK_HEADER_BYTES - OVERFLOW_TRAILER_BYTES) / LEAF_ENTRY_BYTES;
+
+ final static int LEGACY_LEAF_ENTRY_BYTES = Short.BYTES + Short.BYTES + TOKEN_BYTES + Integer.BYTES;
+ final static int LEGACY_TOKEN_OFFSET_BYTES = 2 * Short.BYTES;
+ final static byte LAST_LEAF_SHIFT = 1;
+
+ /**
+ * {@code Header} size in bytes.
+ */
+ final byte ENTRY_TYPE_MASK = 0x03;
+ final short AB_MAGIC = 0x5A51;
+ final short AC_MAGIC = 0x7C63;
// note: ordinal positions are used here, do not change order
enum EntryType
{
- SIMPLE, FACTORED, PACKED, OVERFLOW;
+ SIMPLE,
+ FACTORED,
+ PACKED,
+ OVERFLOW;
public static EntryType of(int ordinal)
{
@@ -61,9 +95,9 @@ public interface TokenTreeBuilder extends Iterable<Pair<Long, LongSet>>
}
}
- void add(Long token, long keyPosition);
- void add(SortedMap<Long, LongSet> data);
- void add(Iterator<Pair<Long, LongSet>> data);
+ void add(Long token, long partitionOffset, long rowOffset);
+ void add(SortedMap<Long, KeyOffsets> data);
+ void add(Iterator<Pair<Long, KeyOffsets>> data);
void add(TokenTreeBuilder ttb);
boolean isEmpty();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/memory/IndexMemtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/memory/IndexMemtable.java b/src/java/org/apache/cassandra/index/sasi/memory/IndexMemtable.java
index e55a806..a7b22f3 100644
--- a/src/java/org/apache/cassandra/index/sasi/memory/IndexMemtable.java
+++ b/src/java/org/apache/cassandra/index/sasi/memory/IndexMemtable.java
@@ -19,14 +19,14 @@ package org.apache.cassandra.index.sasi.memory;
import java.nio.ByteBuffer;
-import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.index.sasi.conf.ColumnIndex;
+import org.apache.cassandra.index.sasi.disk.*;
import org.apache.cassandra.index.sasi.disk.Token;
import org.apache.cassandra.index.sasi.plan.Expression;
import org.apache.cassandra.index.sasi.utils.RangeIterator;
import org.apache.cassandra.index.sasi.utils.TypeUtil;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,7 +42,7 @@ public class IndexMemtable
this.index = MemIndex.forColumn(columnIndex.keyValidator(), columnIndex);
}
- public long index(DecoratedKey key, ByteBuffer value)
+ public long index(RowKey key, ByteBuffer value)
{
if (value == null || value.remaining() == 0)
return 0;
@@ -55,7 +55,7 @@ public class IndexMemtable
{
logger.error("Can't add column {} to index for key: {}, value size {}, validator: {}.",
index.columnIndex.getColumnName(),
- index.columnIndex.keyValidator().getString(key.getKey()),
+ index.columnIndex.keyValidator().getString(key.decoratedKey.getKey()),
FBUtilities.prettyPrintMemory(size),
validator);
return 0;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/memory/KeyRangeIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/memory/KeyRangeIterator.java b/src/java/org/apache/cassandra/index/sasi/memory/KeyRangeIterator.java
index a2f2c0e..b4365dc 100644
--- a/src/java/org/apache/cassandra/index/sasi/memory/KeyRangeIterator.java
+++ b/src/java/org/apache/cassandra/index/sasi/memory/KeyRangeIterator.java
@@ -18,28 +18,27 @@
package org.apache.cassandra.index.sasi.memory;
import java.io.IOException;
-import java.util.Iterator;
-import java.util.SortedSet;
-import java.util.TreeSet;
+import java.util.*;
import java.util.concurrent.ConcurrentSkipListSet;
-import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.index.sasi.disk.*;
import org.apache.cassandra.index.sasi.disk.Token;
import org.apache.cassandra.index.sasi.utils.AbstractIterator;
import org.apache.cassandra.index.sasi.utils.CombinedValue;
import org.apache.cassandra.index.sasi.utils.RangeIterator;
-import com.carrotsearch.hppc.LongOpenHashSet;
-import com.carrotsearch.hppc.LongSet;
import com.google.common.collect.PeekingIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class KeyRangeIterator extends RangeIterator<Long, Token>
{
private final DKIterator iterator;
- public KeyRangeIterator(ConcurrentSkipListSet<DecoratedKey> keys)
+ public KeyRangeIterator(ConcurrentSkipListSet<RowKey> keys)
{
- super((Long) keys.first().getToken().getTokenValue(), (Long) keys.last().getToken().getTokenValue(), keys.size());
+ super((Long) keys.first().decoratedKey.getToken().getTokenValue(), (Long) keys.last().decoratedKey.getToken().getTokenValue(), keys.size());
this.iterator = new DKIterator(keys.iterator());
}
@@ -52,8 +51,8 @@ public class KeyRangeIterator extends RangeIterator<Long, Token>
{
while (iterator.hasNext())
{
- DecoratedKey key = iterator.peek();
- if (Long.compare((long) key.getToken().getTokenValue(), nextToken) >= 0)
+ RowKey key = iterator.peek();
+ if (Long.compare((Long) key.decoratedKey.getToken().getTokenValue(), nextToken) >= 0)
break;
// consume smaller key
@@ -64,16 +63,16 @@ public class KeyRangeIterator extends RangeIterator<Long, Token>
public void close() throws IOException
{}
- private static class DKIterator extends AbstractIterator<DecoratedKey> implements PeekingIterator<DecoratedKey>
+ private static class DKIterator extends AbstractIterator<RowKey> implements PeekingIterator<RowKey>
{
- private final Iterator<DecoratedKey> keys;
+ private final Iterator<RowKey> keys;
- public DKIterator(Iterator<DecoratedKey> keys)
+ public DKIterator(Iterator<RowKey> keys)
{
this.keys = keys;
}
- protected DecoratedKey computeNext()
+ protected RowKey computeNext()
{
return keys.hasNext() ? keys.next() : endOfData();
}
@@ -81,25 +80,21 @@ public class KeyRangeIterator extends RangeIterator<Long, Token>
private static class DKToken extends Token
{
- private final SortedSet<DecoratedKey> keys;
+ private final SortedSet<RowKey> keys;
- public DKToken(final DecoratedKey key)
+ public DKToken(RowKey key)
{
- super((long) key.getToken().getTokenValue());
+ super((Long) key.decoratedKey.getToken().getTokenValue());
- keys = new TreeSet<DecoratedKey>(DecoratedKey.comparator)
+ keys = new TreeSet<RowKey>(RowKey.COMPARATOR)
{{
add(key);
}};
}
- public LongSet getOffsets()
+ public KeyOffsets getOffsets()
{
- LongSet offsets = new LongOpenHashSet(4);
- for (DecoratedKey key : keys)
- offsets.add((long) key.getToken().getTokenValue());
-
- return offsets;
+ throw new IllegalStateException("DecoratedKey tokens are used in memtables and do not have on-disk offsets");
}
public void merge(CombinedValue<Long> other)
@@ -116,14 +111,14 @@ public class KeyRangeIterator extends RangeIterator<Long, Token>
}
else
{
- for (DecoratedKey key : o)
+ for (RowKey key : o)
keys.add(key);
}
}
- public Iterator<DecoratedKey> iterator()
+ public Iterator<RowKey> iterator()
{
return keys.iterator();
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/memory/MemIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/memory/MemIndex.java b/src/java/org/apache/cassandra/index/sasi/memory/MemIndex.java
index cc1eb3f..bfba4cb 100644
--- a/src/java/org/apache/cassandra/index/sasi/memory/MemIndex.java
+++ b/src/java/org/apache/cassandra/index/sasi/memory/MemIndex.java
@@ -19,8 +19,8 @@ package org.apache.cassandra.index.sasi.memory;
import java.nio.ByteBuffer;
-import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.index.sasi.conf.ColumnIndex;
+import org.apache.cassandra.index.sasi.disk.*;
import org.apache.cassandra.index.sasi.disk.Token;
import org.apache.cassandra.index.sasi.plan.Expression;
import org.apache.cassandra.index.sasi.utils.RangeIterator;
@@ -37,7 +37,7 @@ public abstract class MemIndex
this.columnIndex = columnIndex;
}
- public abstract long add(DecoratedKey key, ByteBuffer value);
+ public abstract long add(RowKey key, ByteBuffer value);
public abstract RangeIterator<Long, Token> search(Expression expression);
public static MemIndex forColumn(AbstractType<?> keyValidator, ColumnIndex columnIndex)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/memory/SkipListMemIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/memory/SkipListMemIndex.java b/src/java/org/apache/cassandra/index/sasi/memory/SkipListMemIndex.java
index 69b57d0..9c3562a 100644
--- a/src/java/org/apache/cassandra/index/sasi/memory/SkipListMemIndex.java
+++ b/src/java/org/apache/cassandra/index/sasi/memory/SkipListMemIndex.java
@@ -22,8 +22,8 @@ import java.util.*;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
-import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.index.sasi.conf.ColumnIndex;
+import org.apache.cassandra.index.sasi.disk.*;
import org.apache.cassandra.index.sasi.disk.Token;
import org.apache.cassandra.index.sasi.plan.Expression;
import org.apache.cassandra.index.sasi.utils.RangeUnionIterator;
@@ -34,7 +34,7 @@ public class SkipListMemIndex extends MemIndex
{
public static final int CSLM_OVERHEAD = 128; // average overhead of CSLM
- private final ConcurrentSkipListMap<ByteBuffer, ConcurrentSkipListSet<DecoratedKey>> index;
+ private final ConcurrentSkipListMap<ByteBuffer, ConcurrentSkipListSet<RowKey>> index;
public SkipListMemIndex(AbstractType<?> keyValidator, ColumnIndex columnIndex)
{
@@ -42,14 +42,14 @@ public class SkipListMemIndex extends MemIndex
index = new ConcurrentSkipListMap<>(columnIndex.getValidator());
}
- public long add(DecoratedKey key, ByteBuffer value)
+ public long add(RowKey key, ByteBuffer value)
{
long overhead = CSLM_OVERHEAD; // DKs are shared
- ConcurrentSkipListSet<DecoratedKey> keys = index.get(value);
+ ConcurrentSkipListSet<RowKey> keys = index.get(value);
if (keys == null)
{
- ConcurrentSkipListSet<DecoratedKey> newKeys = new ConcurrentSkipListSet<>(DecoratedKey.comparator);
+ ConcurrentSkipListSet<RowKey> newKeys = new ConcurrentSkipListSet<>();
keys = index.putIfAbsent(value, newKeys);
if (keys == null)
{
@@ -68,7 +68,7 @@ public class SkipListMemIndex extends MemIndex
ByteBuffer min = expression.lower == null ? null : expression.lower.value;
ByteBuffer max = expression.upper == null ? null : expression.upper.value;
- SortedMap<ByteBuffer, ConcurrentSkipListSet<DecoratedKey>> search;
+ SortedMap<ByteBuffer, ConcurrentSkipListSet<RowKey>> search;
if (min == null && max == null)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/memory/TrieMemIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/memory/TrieMemIndex.java b/src/java/org/apache/cassandra/index/sasi/memory/TrieMemIndex.java
index ca60ac5..e1c273d 100644
--- a/src/java/org/apache/cassandra/index/sasi/memory/TrieMemIndex.java
+++ b/src/java/org/apache/cassandra/index/sasi/memory/TrieMemIndex.java
@@ -23,9 +23,8 @@ import java.util.List;
import java.util.concurrent.ConcurrentSkipListSet;
import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.index.sasi.conf.ColumnIndex;
-import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder;
+import org.apache.cassandra.index.sasi.disk.*;
import org.apache.cassandra.index.sasi.disk.Token;
import org.apache.cassandra.index.sasi.plan.Expression;
import org.apache.cassandra.index.sasi.plan.Expression.Op;
@@ -38,7 +37,7 @@ import com.googlecode.concurrenttrees.radix.ConcurrentRadixTree;
import com.googlecode.concurrenttrees.suffix.ConcurrentSuffixTree;
import com.googlecode.concurrenttrees.radix.node.concrete.SmartArrayBasedNodeFactory;
import com.googlecode.concurrenttrees.radix.node.Node;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.*;
import org.slf4j.Logger;
@@ -71,7 +70,7 @@ public class TrieMemIndex extends MemIndex
}
}
- public long add(DecoratedKey key, ByteBuffer value)
+ public long add(RowKey key, ByteBuffer value)
{
AbstractAnalyzer analyzer = columnIndex.getAnalyzer();
analyzer.reset(value.duplicate());
@@ -85,7 +84,7 @@ public class TrieMemIndex extends MemIndex
{
logger.info("Can't add term of column {} to index for key: {}, term size {}, max allowed size {}, use analyzed = true (if not yet set) for that column.",
columnIndex.getColumnName(),
- keyValidator.getString(key.getKey()),
+ keyValidator.getString(key.decoratedKey.getKey()),
FBUtilities.prettyPrintMemory(term.remaining()),
FBUtilities.prettyPrintMemory(OnDiskIndexBuilder.MAX_TERM_SIZE));
continue;
@@ -113,13 +112,13 @@ public class TrieMemIndex extends MemIndex
definition = column;
}
- public long add(String value, DecoratedKey key)
+ public long add(String value, RowKey key)
{
long overhead = CSLM_OVERHEAD;
- ConcurrentSkipListSet<DecoratedKey> keys = get(value);
+ ConcurrentSkipListSet<RowKey> keys = get(value);
if (keys == null)
{
- ConcurrentSkipListSet<DecoratedKey> newKeys = new ConcurrentSkipListSet<>(DecoratedKey.comparator);
+ ConcurrentSkipListSet<RowKey> newKeys = new ConcurrentSkipListSet<>();
keys = putIfAbsent(value, newKeys);
if (keys == null)
{
@@ -141,10 +140,10 @@ public class TrieMemIndex extends MemIndex
{
ByteBuffer prefix = expression.lower == null ? null : expression.lower.value;
- Iterable<ConcurrentSkipListSet<DecoratedKey>> search = search(expression.getOp(), definition.cellValueType().getString(prefix));
+ Iterable<ConcurrentSkipListSet<RowKey>> search = search(expression.getOp(), definition.cellValueType().getString(prefix));
RangeUnionIterator.Builder<Long, Token> builder = RangeUnionIterator.builder();
- for (ConcurrentSkipListSet<DecoratedKey> keys : search)
+ for (ConcurrentSkipListSet<RowKey> keys : search)
{
if (!keys.isEmpty())
builder.add(new KeyRangeIterator(keys));
@@ -153,14 +152,14 @@ public class TrieMemIndex extends MemIndex
return builder.build();
}
- protected abstract ConcurrentSkipListSet<DecoratedKey> get(String value);
- protected abstract Iterable<ConcurrentSkipListSet<DecoratedKey>> search(Op operator, String value);
- protected abstract ConcurrentSkipListSet<DecoratedKey> putIfAbsent(String value, ConcurrentSkipListSet<DecoratedKey> key);
+ protected abstract ConcurrentSkipListSet<RowKey> get(String value);
+ protected abstract Iterable<ConcurrentSkipListSet<RowKey>> search(Op operator, String value);
+ protected abstract ConcurrentSkipListSet<RowKey> putIfAbsent(String value, ConcurrentSkipListSet<RowKey> key);
}
protected static class ConcurrentPrefixTrie extends ConcurrentTrie
{
- private final ConcurrentRadixTree<ConcurrentSkipListSet<DecoratedKey>> trie;
+ private final ConcurrentRadixTree<ConcurrentSkipListSet<RowKey>> trie;
private ConcurrentPrefixTrie(ColumnDefinition column)
{
@@ -168,23 +167,23 @@ public class TrieMemIndex extends MemIndex
trie = new ConcurrentRadixTree<>(NODE_FACTORY);
}
- public ConcurrentSkipListSet<DecoratedKey> get(String value)
+ public ConcurrentSkipListSet<RowKey> get(String value)
{
return trie.getValueForExactKey(value);
}
- public ConcurrentSkipListSet<DecoratedKey> putIfAbsent(String value, ConcurrentSkipListSet<DecoratedKey> newKeys)
+ public ConcurrentSkipListSet<RowKey> putIfAbsent(String value, ConcurrentSkipListSet<RowKey> newKeys)
{
return trie.putIfAbsent(value, newKeys);
}
- public Iterable<ConcurrentSkipListSet<DecoratedKey>> search(Op operator, String value)
+ public Iterable<ConcurrentSkipListSet<RowKey>> search(Op operator, String value)
{
switch (operator)
{
case EQ:
case MATCH:
- ConcurrentSkipListSet<DecoratedKey> keys = trie.getValueForExactKey(value);
+ ConcurrentSkipListSet<RowKey> keys = trie.getValueForExactKey(value);
return keys == null ? Collections.emptyList() : Collections.singletonList(keys);
case PREFIX:
@@ -198,7 +197,7 @@ public class TrieMemIndex extends MemIndex
protected static class ConcurrentSuffixTrie extends ConcurrentTrie
{
- private final ConcurrentSuffixTree<ConcurrentSkipListSet<DecoratedKey>> trie;
+ private final ConcurrentSuffixTree<ConcurrentSkipListSet<RowKey>> trie;
private ConcurrentSuffixTrie(ColumnDefinition column)
{
@@ -206,23 +205,23 @@ public class TrieMemIndex extends MemIndex
trie = new ConcurrentSuffixTree<>(NODE_FACTORY);
}
- public ConcurrentSkipListSet<DecoratedKey> get(String value)
+ public ConcurrentSkipListSet<RowKey> get(String value)
{
return trie.getValueForExactKey(value);
}
- public ConcurrentSkipListSet<DecoratedKey> putIfAbsent(String value, ConcurrentSkipListSet<DecoratedKey> newKeys)
+ public ConcurrentSkipListSet<RowKey> putIfAbsent(String value, ConcurrentSkipListSet<RowKey> newKeys)
{
return trie.putIfAbsent(value, newKeys);
}
- public Iterable<ConcurrentSkipListSet<DecoratedKey>> search(Op operator, String value)
+ public Iterable<ConcurrentSkipListSet<RowKey>> search(Op operator, String value)
{
switch (operator)
{
case EQ:
case MATCH:
- ConcurrentSkipListSet<DecoratedKey> keys = trie.getValueForExactKey(value);
+ ConcurrentSkipListSet<RowKey> keys = trie.getValueForExactKey(value);
return keys == null ? Collections.emptyList() : Collections.singletonList(keys);
case SUFFIX:
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java b/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java
index fa1181f..af4e249 100644
--- a/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java
+++ b/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java
@@ -17,15 +17,18 @@
*/
package org.apache.cassandra.index.sasi.plan;
+import java.io.IOException;
import java.util.*;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.filter.DataLimits;
-import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.index.Index;
import org.apache.cassandra.index.sasi.SASIIndex;
@@ -42,10 +45,12 @@ import org.apache.cassandra.index.sasi.utils.RangeUnionIterator;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.*;
public class QueryController
{
+ private static final Logger logger = LoggerFactory.getLogger(QueryController.class);
+
private final long executionQuota;
private final long executionStart;
@@ -94,22 +99,26 @@ public class QueryController
return index.isPresent() ? ((SASIIndex) index.get()).getIndex() : null;
}
-
- public UnfilteredRowIterator getPartition(DecoratedKey key, ReadExecutionController executionController)
+ public UnfilteredRowIterator getPartition(DecoratedKey key, NavigableSet<Clustering> clusterings, ReadExecutionController executionController)
{
if (key == null)
throw new NullPointerException();
+
try
{
- SinglePartitionReadCommand partition = SinglePartitionReadCommand.create(command.isForThrift(),
- cfs.metadata,
+ ClusteringIndexFilter filter;
+ if (clusterings == null)
+ filter = new ClusteringIndexSliceFilter(Slices.ALL, false);
+ else
+ filter = new ClusteringIndexNamesFilter(clusterings, false);
+
+ SinglePartitionReadCommand partition = SinglePartitionReadCommand.create(cfs.metadata,
command.nowInSec(),
command.columnFilter(),
- command.rowFilter().withoutExpressions(),
+ command.rowFilter(),
DataLimits.NONE,
key,
- command.clusteringIndexFilter(key));
-
+ filter);
return partition.queryMemtableAndDisk(cfs, executionController);
}
finally
@@ -135,20 +144,24 @@ public class QueryController
RangeIterator.Builder<Long, Token> builder = op == OperationType.OR
? RangeUnionIterator.<Long, Token>builder()
- : RangeIntersectionIterator.<Long, Token>builder();
+ : RangeIntersectionIterator.<Long, org.apache.cassandra.index.sasi.disk.Token>builder();
List<RangeIterator<Long, Token>> perIndexUnions = new ArrayList<>();
for (Map.Entry<Expression, Set<SSTableIndex>> e : getView(op, expressions).entrySet())
{
- @SuppressWarnings("resource") // RangeIterators are closed by releaseIndexes
- RangeIterator<Long, Token> index = TermIterator.build(e.getKey(), e.getValue());
-
- if (index == null)
- continue;
+ try (RangeIterator<Long, Token> index = TermIterator.build(e.getKey(), e.getValue()))
+ {
+ if (index == null)
+ continue;
- builder.add(index);
- perIndexUnions.add(index);
+ builder.add(index);
+ perIndexUnions.add(index);
+ }
+ catch (IOException ex)
+ {
+ logger.error("Failed to release index: ", ex);
+ }
}
resources.put(expressions, perIndexUnions);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java b/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java
index 4410756..ccb369c 100644
--- a/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java
+++ b/src/java/org/apache/cassandra/index/sasi/plan/QueryPlan.java
@@ -19,16 +19,19 @@ package org.apache.cassandra.index.sasi.plan;
import java.util.*;
-import org.apache.cassandra.config.CFMetaData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.*;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.*;
+import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.index.sasi.disk.*;
import org.apache.cassandra.index.sasi.disk.Token;
-import org.apache.cassandra.index.sasi.plan.Operation.OperationType;
-import org.apache.cassandra.exceptions.RequestTimeoutException;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.utils.AbstractIterator;
+import org.apache.cassandra.index.sasi.plan.Operation.*;
+import org.apache.cassandra.utils.btree.*;
public class QueryPlan
{
@@ -68,14 +71,16 @@ public class QueryPlan
return new ResultIterator(analyze(), controller, executionController);
}
- private static class ResultIterator extends AbstractIterator<UnfilteredRowIterator> implements UnfilteredPartitionIterator
+ private static class ResultIterator implements UnfilteredPartitionIterator
{
private final AbstractBounds<PartitionPosition> keyRange;
private final Operation operationTree;
private final QueryController controller;
private final ReadExecutionController executionController;
- private Iterator<DecoratedKey> currentKeys = null;
+ private Iterator<RowKey> currentKeys = null;
+ private UnfilteredRowIterator nextPartition = null;
+ private DecoratedKey lastPartitionKey = null;
public ResultIterator(Operation operationTree, QueryController controller, ReadExecutionController executionController)
{
@@ -87,53 +92,152 @@ public class QueryPlan
operationTree.skipTo((Long) keyRange.left.getToken().getTokenValue());
}
- protected UnfilteredRowIterator computeNext()
+ public boolean hasNext()
+ {
+ return prepareNext();
+ }
+
+ public UnfilteredRowIterator next()
+ {
+ if (nextPartition == null)
+ prepareNext();
+
+ UnfilteredRowIterator toReturn = nextPartition;
+ nextPartition = null;
+ return toReturn;
+ }
+
+ private boolean prepareNext()
{
if (operationTree == null)
- return endOfData();
+ return false;
+
+ if (nextPartition != null)
+ nextPartition.close();
for (;;)
{
if (currentKeys == null || !currentKeys.hasNext())
{
if (!operationTree.hasNext())
- return endOfData();
+ return false;
Token token = operationTree.next();
currentKeys = token.iterator();
}
- while (currentKeys.hasNext())
+ CFMetaData metadata = controller.metadata();
+ BTreeSet.Builder<Clustering> clusterings = BTreeSet.builder(metadata.comparator);
+ // results have static clustering, the whole partition has to be read
+ boolean fetchWholePartition = false;
+
+ while (true)
{
- DecoratedKey key = currentKeys.next();
+ if (!currentKeys.hasNext())
+ {
+ // No more keys for this token.
+ // If no clusterings were collected yet, exit this inner loop so the operation
+ // tree iterator can move on to the next token.
+ // If some clusterings were collected, build an iterator for those rows
+ // and return.
+ if ((clusterings.isEmpty() && !fetchWholePartition) || lastPartitionKey == null)
+ break;
+
+ UnfilteredRowIterator partition = fetchPartition(lastPartitionKey, clusterings.build(), fetchWholePartition);
+ // Prepare for next partition, reset partition key and clusterings
+ lastPartitionKey = null;
+ clusterings = BTreeSet.builder(metadata.comparator);
+
+ if (partition.isEmpty())
+ {
+ partition.close();
+ continue;
+ }
+
+ nextPartition = partition;
+ return true;
+ }
+
+ RowKey fullKey = currentKeys.next();
+ DecoratedKey key = fullKey.decoratedKey;
if (!keyRange.right.isMinimum() && keyRange.right.compareTo(key) < 0)
- return endOfData();
+ return false;
- try (UnfilteredRowIterator partition = controller.getPartition(key, executionController))
+ if (lastPartitionKey != null && metadata.getKeyValidator().compare(lastPartitionKey.getKey(), key.getKey()) != 0)
{
- Row staticRow = partition.staticRow();
- List<Unfiltered> clusters = new ArrayList<>();
+ UnfilteredRowIterator partition = fetchPartition(lastPartitionKey, clusterings.build(), fetchWholePartition);
- while (partition.hasNext())
+ if (partition.isEmpty())
+ partition.close();
+ else
{
- Unfiltered row = partition.next();
- if (operationTree.satisfiedBy(row, staticRow, true))
- clusters.add(row);
+ nextPartition = partition;
+ return true;
}
-
- if (!clusters.isEmpty())
- return new PartitionIterator(partition, clusters);
}
+
+ lastPartitionKey = key;
+
+ // We fetch whole partition for versions before AC and in case static column index is queried in AC
+ if (fullKey.clustering == null || fullKey.clustering.clustering().kind() == ClusteringPrefix.Kind.STATIC_CLUSTERING)
+ fetchWholePartition = true;
+ else
+ clusterings.add(fullKey.clustering);
+
}
}
}
+ private UnfilteredRowIterator fetchPartition(DecoratedKey key, NavigableSet<Clustering> clusterings, boolean fetchWholePartition)
+ {
+ if (fetchWholePartition)
+ clusterings = null;
+
+ try (UnfilteredRowIterator partition = controller.getPartition(key, clusterings, executionController))
+ {
+ Row staticRow = partition.staticRow();
+ List<Unfiltered> clusters = new ArrayList<>();
+
+ while (partition.hasNext())
+ {
+ Unfiltered row = partition.next();
+ if (operationTree.satisfiedBy(row, staticRow, true))
+ clusters.add(row);
+ }
+
+ if (!clusters.isEmpty())
+ return new PartitionIterator(partition, clusters);
+ else
+ return UnfilteredRowIterators.noRowsIterator(partition.metadata(),
+ partition.partitionKey(),
+ Rows.EMPTY_STATIC_ROW,
+ partition.partitionLevelDeletion(),
+ partition.isReverseOrder());
+ }
+ }
+
+ public void close()
+ {
+ if (nextPartition != null)
+ nextPartition.close();
+ }
+
+ public boolean isForThrift()
+ {
+ return controller.isForThrift();
+ }
+
+ public CFMetaData metadata()
+ {
+ return controller.metadata();
+ }
+
private static class PartitionIterator extends AbstractUnfilteredRowIterator
{
private final Iterator<Unfiltered> rows;
- public PartitionIterator(UnfilteredRowIterator partition, Collection<Unfiltered> content)
+ public PartitionIterator(UnfilteredRowIterator partition, Collection<Unfiltered> filteredRows)
{
super(partition.metadata(),
partition.partitionKey(),
@@ -143,7 +247,7 @@ public class QueryPlan
partition.isReverseOrder(),
partition.stats());
- rows = content.iterator();
+ rows = filteredRows.iterator();
}
@Override
@@ -152,21 +256,5 @@ public class QueryPlan
return rows.hasNext() ? rows.next() : endOfData();
}
}
-
- public boolean isForThrift()
- {
- return controller.isForThrift();
- }
-
- public CFMetaData metadata()
- {
- return controller.metadata();
- }
-
- public void close()
- {
- FileUtils.closeQuietly(operationTree);
- controller.finish();
- }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/io/sstable/format/SSTableFlushObserver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableFlushObserver.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableFlushObserver.java
index f0b6bac..35898aa 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableFlushObserver.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableFlushObserver.java
@@ -46,6 +46,11 @@ public interface SSTableFlushObserver
*
* @param unfilteredCluster The unfiltered cluster being added to SSTable.
*/
+ default void nextUnfilteredCluster(Unfiltered unfilteredCluster, long position)
+ {
+ nextUnfilteredCluster(unfilteredCluster);
+ }
+
void nextUnfilteredCluster(Unfiltered unfilteredCluster);
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 87d2a6e..8442ed7 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -50,8 +50,7 @@ import org.apache.cassandra.config.Schema;
import org.apache.cassandra.config.SchemaConstants;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.ColumnFilter;
-import org.apache.cassandra.db.rows.EncodingStats;
-import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
@@ -63,6 +62,7 @@ import org.apache.cassandra.io.sstable.metadata.*;
import org.apache.cassandra.io.util.*;
import org.apache.cassandra.metrics.RestorableMeter;
import org.apache.cassandra.metrics.StorageMetrics;
+import org.apache.cassandra.net.*;
import org.apache.cassandra.schema.CachingParams;
import org.apache.cassandra.schema.IndexMetadata;
import org.apache.cassandra.service.ActiveRepairService;
@@ -1780,6 +1780,35 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
}
/**
+ * Reads Clustering Key from the data file of current sstable.
+ *
+ * @param rowPosition start position of given row in the data file
+ * @return Clustering of the row
+ * @throws IOException
+ */
+ public Clustering clusteringAt(long rowPosition) throws IOException
+ {
+ Clustering clustering;
+ try (FileDataInput in = dfile.createReader(rowPosition))
+ {
+ if (in.isEOF())
+ return null;
+
+ int flags = in.readUnsignedByte();
+ int extendedFlags = UnfilteredSerializer.readExtendedFlags(in, flags);
+ boolean isStatic = UnfilteredSerializer.isStatic(extendedFlags);
+
+ if (isStatic)
+ clustering = Clustering.STATIC_CLUSTERING;
+ else
+ // Since this is an internal call, we don't have to take care of protocol versions that use legacy layout
+ clustering = Clustering.serializer.deserialize(in, MessagingService.VERSION_30, header.clusteringTypes());
+ }
+
+ return clustering;
+ }
+
+ /**
* TODO: Move someplace reusable
*/
public abstract static class Operator
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index 5696ecb..93d9822 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -21,9 +21,7 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Optional;
+import java.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -163,7 +161,9 @@ public class BigTableWriter extends SSTableWriter
return null;
long startPosition = beforeAppend(key);
- observers.forEach((o) -> o.startPartition(key, iwriter.indexFile.position()));
+ observers.forEach((o) -> {
+ o.startPartition(key, iwriter.indexFile.position());
+ });
//Reuse the writer for each row
columnIndexWriter.reset();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/utils/obs/BitUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/obs/BitUtil.java b/src/java/org/apache/cassandra/utils/obs/BitUtil.java
index e04de2b..c438d1b 100644
--- a/src/java/org/apache/cassandra/utils/obs/BitUtil.java
+++ b/src/java/org/apache/cassandra/utils/obs/BitUtil.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.utils.obs;
/** A variety of high efficiency bit twiddling routines.
* @lucene.internal
*/
-final class BitUtil
+public final class BitUtil
{
/** Returns the number of bits set in the long */
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/test/data/legacy-sasi/on-disk-sa-int2.db
----------------------------------------------------------------------
diff --git a/test/data/legacy-sasi/on-disk-sa-int2.db b/test/data/legacy-sasi/on-disk-sa-int2.db
new file mode 100644
index 0000000..71f662f
Binary files /dev/null and b/test/data/legacy-sasi/on-disk-sa-int2.db differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java b/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java
index 0b4e9e2..fc5afac 100644
--- a/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java
@@ -76,6 +76,7 @@ import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
+import com.google.common.collect.Sets;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Uninterruptibles;
@@ -92,7 +93,7 @@ public class SASIIndexTest
PARTITIONER = Murmur3Partitioner.instance;
}
- private static final String KS_NAME = "sasi";
+ private static final String KS_NAME = "sasi_index_test";
private static final String CF_NAME = "test_cf";
private static final String CLUSTERING_CF_NAME_1 = "clustering_test_cf_1";
private static final String CLUSTERING_CF_NAME_2 = "clustering_test_cf_2";
@@ -448,9 +449,15 @@ public class SASIIndexTest
if (forceFlush)
store.forceBlockingFlush();
- final UntypedResultSet results = executeCQL(FTS_CF_NAME, "SELECT * FROM %s.%s WHERE artist LIKE 'lady%%'");
- Assert.assertNotNull(results);
- Assert.assertEquals(3, results.size());
+ CQLTester.assertRowsIgnoringOrder(executeCQL(FTS_CF_NAME, "SELECT * FROM %s.%s WHERE artist LIKE 'lady%%'"),
+ CQLTester.row(UUID.fromString("1a4abbcd-b5de-4c69-a578-31231e01ff09"), "Lady Gaga", "Poker Face"),
+ CQLTester.row(UUID.fromString("4f8dc18e-54e6-4e16-b507-c5324b61523b"), "Lady Pank", "Zamki na piasku"),
+ CQLTester.row(UUID.fromString("eaf294fa-bad5-49d4-8f08-35ba3636a706"), "Lady Pank", "Koncertowa"));
+
+ CQLTester.assertRowsIgnoringOrder(executeCQL(FTS_CF_NAME, "SELECT artist, title FROM %s.%s WHERE artist LIKE 'lady%%'"),
+ CQLTester.row("Lady Gaga", "Poker Face"),
+ CQLTester.row("Lady Pank", "Zamki na piasku"),
+ CQLTester.row("Lady Pank", "Koncertowa"));
}
@Test
@@ -664,7 +671,7 @@ public class SASIIndexTest
add("key21");
}};
- Assert.assertEquals(expected, convert(uniqueKeys));
+ Assert.assertEquals(Sets.newHashSet(expected), Sets.newHashSet(convert(uniqueKeys)));
// now let's test a single equals condition
@@ -690,7 +697,7 @@ public class SASIIndexTest
add("key21");
}};
- Assert.assertEquals(expected, convert(uniqueKeys));
+ Assert.assertEquals(Sets.newHashSet(expected), Sets.newHashSet(convert(uniqueKeys)));
// now let's test something which is smaller than a single page
uniqueKeys = getPaged(store, 4,
@@ -704,7 +711,7 @@ public class SASIIndexTest
add("key07");
}};
- Assert.assertEquals(expected, convert(uniqueKeys));
+ Assert.assertEquals(Sets.newHashSet(expected), Sets.newHashSet(convert(uniqueKeys)));
// the same but with the page size of 2 to test minimal pagination windows
@@ -712,7 +719,7 @@ public class SASIIndexTest
buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
buildExpression(age, Operator.EQ, Int32Type.instance.decompose(36)));
- Assert.assertEquals(expected, convert(uniqueKeys));
+ Assert.assertEquals(Sets.newHashSet(expected), Sets.newHashSet(convert(uniqueKeys)));
// and last but not least, test age range query with pagination
uniqueKeys = getPaged(store, 4,
@@ -736,7 +743,7 @@ public class SASIIndexTest
add("key21");
}};
- Assert.assertEquals(expected, convert(uniqueKeys));
+ Assert.assertEquals(Sets.newHashSet(expected), Sets.newHashSet(convert(uniqueKeys)));
Set<String> rows;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/test/unit/org/apache/cassandra/index/sasi/disk/KeyOffsetsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/sasi/disk/KeyOffsetsTest.java b/test/unit/org/apache/cassandra/index/sasi/disk/KeyOffsetsTest.java
new file mode 100644
index 0000000..21ef070
--- /dev/null
+++ b/test/unit/org/apache/cassandra/index/sasi/disk/KeyOffsetsTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.index.sasi.disk;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class KeyOffsetsTest
+{
+ @Test
+ public void testDuplicates()
+ {
+ KeyOffsets offsets = new KeyOffsets();
+ long[] arr = new long[]{ 1, 2, 3, 4, 5, 6 };
+ offsets.put(1, arr);
+ Assert.assertArrayEquals(offsets.get(1), arr);
+ offsets.put(1, arr);
+ Assert.assertArrayEquals(offsets.get(1), arr);
+ for (long l : arr)
+ offsets.put(1, l);
+ Assert.assertArrayEquals(offsets.get(1), arr);
+
+ for (long l : arr)
+ offsets.put(2, l);
+ Assert.assertArrayEquals(offsets.get(2), arr);
+ offsets.put(2, arr);
+ Assert.assertArrayEquals(offsets.get(2), arr);
+ offsets.put(2, arr);
+ Assert.assertArrayEquals(offsets.get(2), arr);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/test/unit/org/apache/cassandra/index/sasi/disk/OnDiskIndexTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/sasi/disk/OnDiskIndexTest.java b/test/unit/org/apache/cassandra/index/sasi/disk/OnDiskIndexTest.java
index 10dc7a8..b56cb4e 100644
--- a/test/unit/org/apache/cassandra/index/sasi/disk/OnDiskIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/sasi/disk/OnDiskIndexTest.java
@@ -24,13 +24,16 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import org.apache.cassandra.config.DatabaseDescriptor;
+import com.carrotsearch.hppc.cursors.LongObjectCursor;
import org.apache.cassandra.cql3.Operator;
-import org.apache.cassandra.db.BufferDecoratedKey;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.ClusteringComparator;
import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.index.sasi.plan.Expression;
import org.apache.cassandra.index.sasi.utils.CombinedTerm;
import org.apache.cassandra.index.sasi.utils.CombinedTermIterator;
+import org.apache.cassandra.index.sasi.utils.KeyConverter;
import org.apache.cassandra.index.sasi.utils.OnDiskIndexIterator;
import org.apache.cassandra.index.sasi.utils.RangeIterator;
import org.apache.cassandra.db.marshal.AbstractType;
@@ -38,13 +41,8 @@ import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.marshal.LongType;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.utils.MurmurHash;
import org.apache.cassandra.utils.Pair;
-import com.carrotsearch.hppc.LongSet;
-import com.carrotsearch.hppc.cursors.LongCursor;
-
-import com.google.common.base.Function;
import com.google.common.collect.Iterators;
import com.google.common.collect.Sets;
@@ -87,7 +85,7 @@ public class OnDiskIndexTest
builder.finish(index);
- OnDiskIndex onDisk = new OnDiskIndex(index, UTF8Type.instance, new KeyConverter());
+ OnDiskIndex onDisk = new OnDiskIndex(index, UTF8Type.instance, KeyConverter.instance);
// first check if we can find exact matches
for (Map.Entry<ByteBuffer, TokenTreeBuilder> e : data.entrySet())
@@ -95,11 +93,13 @@ public class OnDiskIndexTest
if (UTF8Type.instance.getString(e.getKey()).equals("cat"))
continue; // cat is embedded into scat, we'll test it in next section
- Assert.assertEquals("Key was: " + UTF8Type.instance.compose(e.getKey()), convert(e.getValue()), convert(onDisk.search(expressionFor(UTF8Type.instance, e.getKey()))));
+ Assert.assertEquals("Key was: " + UTF8Type.instance.compose(e.getKey()),
+ convert(e.getValue()),
+ convert(onDisk.search(expressionFor(UTF8Type.instance, e.getKey()))));
}
// check that cat returns positions for scat & cat
- Assert.assertEquals(convert(1, 4), convert(onDisk.search(expressionFor("cat"))));
+ Assert.assertEquals(convert(1L, 4L), convert(onDisk.search(expressionFor("cat"))));
// random suffix queries
Assert.assertEquals(convert(9, 10), convert(onDisk.search(expressionFor("ar"))));
@@ -143,7 +143,7 @@ public class OnDiskIndexTest
builder.finish(index);
- OnDiskIndex onDisk = new OnDiskIndex(index, Int32Type.instance, new KeyConverter());
+ OnDiskIndex onDisk = new OnDiskIndex(index, Int32Type.instance, KeyConverter.instance);
for (Map.Entry<ByteBuffer, TokenTreeBuilder> e : data.entrySet())
{
@@ -224,14 +224,14 @@ public class OnDiskIndexTest
OnDiskIndexBuilder iterTest = new OnDiskIndexBuilder(UTF8Type.instance, Int32Type.instance, OnDiskIndexBuilder.Mode.PREFIX);
for (int i = 0; i < iterCheckNums.size(); i++)
- iterTest.add(iterCheckNums.get(i), keyAt((long) i), i);
+ iterTest.add(iterCheckNums.get(i), keyAt((long) i), i, i + 5);
File iterIndex = File.createTempFile("sa-iter", ".db");
iterIndex.deleteOnExit();
iterTest.finish(iterIndex);
- onDisk = new OnDiskIndex(iterIndex, Int32Type.instance, new KeyConverter());
+ onDisk = new OnDiskIndex(iterIndex, Int32Type.instance, KeyConverter.instance);
ByteBuffer number = Int32Type.instance.decompose(1);
Assert.assertEquals(0, Iterators.size(onDisk.iteratorAt(number, OnDiskIndex.IteratorOrder.ASC, false)));
@@ -283,7 +283,7 @@ public class OnDiskIndexTest
builder.finish(index);
- OnDiskIndex onDisk = new OnDiskIndex(index, UTF8Type.instance, new KeyConverter());
+ OnDiskIndex onDisk = new OnDiskIndex(index, UTF8Type.instance, KeyConverter.instance);
Assert.assertEquals(convert(1, 2, 3, 4, 5, 6), convert(onDisk.search(expressionFor("liz"))));
Assert.assertEquals(convert(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), convert(onDisk.search(expressionFor("a"))));
@@ -315,14 +315,14 @@ public class OnDiskIndexTest
final int numIterations = 100000;
for (long i = 0; i < numIterations; i++)
- builder.add(LongType.instance.decompose(start + i), keyAt(i), i);
+ builder.add(LongType.instance.decompose(start + i), keyAt(i), i, clusteringOffset(i));
File index = File.createTempFile("on-disk-sa-sparse", "db");
index.deleteOnExit();
builder.finish(index);
- OnDiskIndex onDisk = new OnDiskIndex(index, LongType.instance, new KeyConverter());
+ OnDiskIndex onDisk = new OnDiskIndex(index, LongType.instance, KeyConverter.instance);
ThreadLocalRandom random = ThreadLocalRandom.current();
@@ -343,9 +343,9 @@ public class OnDiskIndexTest
if (upperInclusive)
upperKey += 1;
- Set<DecoratedKey> actual = convert(rows);
+ Set<RowKey> actual = convert(rows);
for (long key = lowerKey; key < upperKey; key++)
- Assert.assertTrue("key" + key + " wasn't found", actual.contains(keyAt(key)));
+ Assert.assertTrue("key" + key + " wasn't found", actual.contains(new RowKey(keyAt(key), ck(clusteringOffset(key)), CLUSTERING_COMPARATOR)));
Assert.assertEquals((upperKey - lowerKey), actual.size());
}
@@ -353,7 +353,7 @@ public class OnDiskIndexTest
// let's also explicitly test whole range search
RangeIterator<Long, Token> rows = onDisk.search(expressionFor(start, true, start + numIterations, true));
- Set<DecoratedKey> actual = convert(rows);
+ Set<RowKey> actual = convert(rows);
Assert.assertEquals(numIterations, actual.size());
}
@@ -380,7 +380,7 @@ public class OnDiskIndexTest
builder.finish(index);
- OnDiskIndex onDisk = new OnDiskIndex(index, UTF8Type.instance, new KeyConverter());
+ OnDiskIndex onDisk = new OnDiskIndex(index, UTF8Type.instance, KeyConverter.instance);
// test whole words first
Assert.assertEquals(convert(3, 4, 5, 6, 7, 8, 9, 10), convert(onDisk.search(expressionForNot("Aleksey", "Vijay", "Pavel"))));
@@ -424,7 +424,7 @@ public class OnDiskIndexTest
builder.finish(index);
- OnDiskIndex onDisk = new OnDiskIndex(index, Int32Type.instance, new KeyConverter());
+ OnDiskIndex onDisk = new OnDiskIndex(index, Int32Type.instance, KeyConverter.instance);
Assert.assertEquals(convert(1, 2, 4, 5, 6, 7, 8, 9, 10, 11, 12), convert(onDisk.search(expressionForNot(0, 10, 1))));
Assert.assertEquals(convert(1, 2, 4, 5, 7, 9, 10, 11, 12), convert(onDisk.search(expressionForNot(0, 10, 1, 8))));
@@ -439,16 +439,16 @@ public class OnDiskIndexTest
final long lower = 0;
final long upper = 100000;
- OnDiskIndexBuilder builder = new OnDiskIndexBuilder(UTF8Type.instance, LongType.instance, OnDiskIndexBuilder.Mode.SPARSE);
+ OnDiskIndexBuilder builder = new OnDiskIndexBuilder(LongType.instance, LongType.instance, OnDiskIndexBuilder.Mode.SPARSE);
for (long i = lower; i <= upper; i++)
- builder.add(LongType.instance.decompose(i), keyAt(i), i);
+ builder.add(LongType.instance.decompose(i), keyAt(i), i, clusteringOffset(i));
File index = File.createTempFile("on-disk-sa-except-long-ranges", "db");
index.deleteOnExit();
builder.finish(index);
- OnDiskIndex onDisk = new OnDiskIndex(index, LongType.instance, new KeyConverter());
+ OnDiskIndex onDisk = new OnDiskIndex(index, LongType.instance, KeyConverter.instance);
ThreadLocalRandom random = ThreadLocalRandom.current();
@@ -503,10 +503,10 @@ public class OnDiskIndexTest
private int validateExclusions(OnDiskIndex sa, long lower, long upper, Set<Long> exclusions, boolean checkCount)
{
int count = 0;
- for (DecoratedKey key : convert(sa.search(rangeWithExclusions(lower, true, upper, true, exclusions))))
+ for (RowKey key : convert(sa.search(rangeWithExclusions(lower, true, upper, true, exclusions))))
{
- String keyId = UTF8Type.instance.getString(key.getKey()).split("key")[1];
- Assert.assertFalse("key" + keyId + " is present.", exclusions.contains(Long.valueOf(keyId)));
+ long keyId = LongType.instance.compose(key.decoratedKey.getKey());
+ Assert.assertFalse("key" + keyId + " is present.", exclusions.contains(keyId));
count++;
}
@@ -519,40 +519,49 @@ public class OnDiskIndexTest
@Test
public void testDescriptor() throws Exception
{
- final Map<ByteBuffer, Pair<DecoratedKey, Long>> data = new HashMap<ByteBuffer, Pair<DecoratedKey, Long>>()
+ final Map<ByteBuffer, Pair<RowKey, Long>> data = new HashMap<ByteBuffer, Pair<RowKey, Long>>()
{{
- put(Int32Type.instance.decompose(5), Pair.create(keyAt(1L), 1L));
+ put(Int32Type.instance.decompose(5), Pair.create(new RowKey(keyAt(1L), ck(clusteringOffset(1L)), CLUSTERING_COMPARATOR) , 1L));
}};
OnDiskIndexBuilder builder1 = new OnDiskIndexBuilder(UTF8Type.instance, Int32Type.instance, OnDiskIndexBuilder.Mode.PREFIX);
- OnDiskIndexBuilder builder2 = new OnDiskIndexBuilder(UTF8Type.instance, Int32Type.instance, OnDiskIndexBuilder.Mode.PREFIX);
- for (Map.Entry<ByteBuffer, Pair<DecoratedKey, Long>> e : data.entrySet())
+ for (Map.Entry<ByteBuffer, Pair<RowKey, Long>> e : data.entrySet())
{
- DecoratedKey key = e.getValue().left;
+ DecoratedKey key = e.getValue().left.decoratedKey;
Long position = e.getValue().right;
- builder1.add(e.getKey(), key, position);
- builder2.add(e.getKey(), key, position);
+ builder1.add(e.getKey(), key, position, clusteringOffset(position));
}
File index1 = File.createTempFile("on-disk-sa-int", "db");
- File index2 = File.createTempFile("on-disk-sa-int2", "db");
+
index1.deleteOnExit();
- index2.deleteOnExit();
builder1.finish(index1);
- builder2.finish(new Descriptor(Descriptor.VERSION_AA), index2);
+ OnDiskIndex onDisk1 = new OnDiskIndex(index1, Int32Type.instance, KeyConverter.instance);
+ ByteBuffer number = Int32Type.instance.decompose(5);
+ Assert.assertEquals(Collections.singleton(data.get(number).left), convert(onDisk1.search(expressionFor(Operator.EQ, Int32Type.instance, number))));
+ Assert.assertEquals(onDisk1.descriptor.version, Descriptor.CURRENT_VERSION);
+ }
- OnDiskIndex onDisk1 = new OnDiskIndex(index1, Int32Type.instance, new KeyConverter());
- OnDiskIndex onDisk2 = new OnDiskIndex(index2, Int32Type.instance, new KeyConverter());
- ByteBuffer number = Int32Type.instance.decompose(5);
+ static final String DATA_DIR = "test/data/legacy-sasi/";
- Assert.assertEquals(Collections.singleton(data.get(number).left), convert(onDisk1.search(expressionFor(Operator.EQ, Int32Type.instance, number))));
+ @Test
+ public void testLegacyDescriptor() throws Exception
+ {
+ final Map<ByteBuffer, Pair<RowKey, Long>> data = new HashMap<ByteBuffer, Pair<RowKey, Long>>()
+ {{
+ put(Int32Type.instance.decompose(5), Pair.create(new RowKey(keyAt(1L), ck(KeyOffsets.NO_OFFSET), CLUSTERING_COMPARATOR) , 1L));
+ }};
+
+ File index2 = new File(DATA_DIR + "on-disk-sa-int2.db");
+ OnDiskIndex onDisk2 = new OnDiskIndex(index2, Int32Type.instance, KeyConverter.instance);
+
+ ByteBuffer number = Int32Type.instance.decompose(5);
Assert.assertEquals(Collections.singleton(data.get(number).left), convert(onDisk2.search(expressionFor(Operator.EQ, Int32Type.instance, number))));
- Assert.assertEquals(onDisk1.descriptor.version.version, Descriptor.CURRENT_VERSION);
- Assert.assertEquals(onDisk2.descriptor.version.version, Descriptor.VERSION_AA);
+ Assert.assertEquals(onDisk2.descriptor.version, Descriptor.VERSION_AA);
}
@Test
@@ -574,7 +583,7 @@ public class OnDiskIndexTest
builder.finish(index);
- OnDiskIndex onDisk = new OnDiskIndex(index, Int32Type.instance, new KeyConverter());
+ OnDiskIndex onDisk = new OnDiskIndex(index, Int32Type.instance, KeyConverter.instance);
OnDiskIndex.OnDiskSuperBlock superBlock = onDisk.dataLevel.getSuperBlock(0);
Iterator<Token> iter = superBlock.iterator();
@@ -595,14 +604,14 @@ public class OnDiskIndexTest
{
OnDiskIndexBuilder builder = new OnDiskIndexBuilder(UTF8Type.instance, LongType.instance, OnDiskIndexBuilder.Mode.SPARSE);
for (long i = 0; i < 100000; i++)
- builder.add(LongType.instance.decompose(i), keyAt(i), i);
+ builder.add(LongType.instance.decompose(i), keyAt(i), i, clusteringOffset(i));
File index = File.createTempFile("on-disk-sa-multi-superblock-match", ".db");
index.deleteOnExit();
builder.finish(index);
- OnDiskIndex onDiskIndex = new OnDiskIndex(index, LongType.instance, new KeyConverter());
+ OnDiskIndex onDiskIndex = new OnDiskIndex(index, LongType.instance, KeyConverter.instance);
testSearchRangeWithSuperBlocks(onDiskIndex, 0, 500);
testSearchRangeWithSuperBlocks(onDiskIndex, 300, 93456);
@@ -617,9 +626,9 @@ public class OnDiskIndexTest
}
}
- public void putAll(SortedMap<Long, LongSet> offsets, TokenTreeBuilder ttb)
+ public void putAll(SortedMap<Long, KeyOffsets> offsets, TokenTreeBuilder ttb)
{
- for (Pair<Long, LongSet> entry : ttb)
+ for (Pair<Long, KeyOffsets> entry : ttb)
offsets.put(entry.left, entry.right);
}
@@ -629,26 +638,26 @@ public class OnDiskIndexTest
OnDiskIndexBuilder builderA = new OnDiskIndexBuilder(UTF8Type.instance, LongType.instance, OnDiskIndexBuilder.Mode.PREFIX);
OnDiskIndexBuilder builderB = new OnDiskIndexBuilder(UTF8Type.instance, LongType.instance, OnDiskIndexBuilder.Mode.PREFIX);
- TreeMap<Long, TreeMap<Long, LongSet>> expected = new TreeMap<>();
+ TreeMap<Long, TreeMap<Long, KeyOffsets>> expected = new TreeMap<>();
for (long i = 0; i <= 100; i++)
{
- TreeMap<Long, LongSet> offsets = expected.get(i);
+ TreeMap<Long, KeyOffsets> offsets = expected.get(i);
if (offsets == null)
expected.put(i, (offsets = new TreeMap<>()));
- builderA.add(LongType.instance.decompose(i), keyAt(i), i);
+ builderA.add(LongType.instance.decompose(i), keyAt(i), i, clusteringOffset(i));
putAll(offsets, keyBuilder(i));
}
for (long i = 50; i < 100; i++)
{
- TreeMap<Long, LongSet> offsets = expected.get(i);
+ TreeMap<Long, KeyOffsets> offsets = expected.get(i);
if (offsets == null)
expected.put(i, (offsets = new TreeMap<>()));
long position = 100L + i;
- builderB.add(LongType.instance.decompose(i), keyAt(position), position);
+ builderB.add(LongType.instance.decompose(i), keyAt(position), position, clusteringOffset(position));
putAll(offsets, keyBuilder(100L + i));
}
@@ -661,19 +670,19 @@ public class OnDiskIndexTest
builderA.finish(indexA);
builderB.finish(indexB);
- OnDiskIndex a = new OnDiskIndex(indexA, LongType.instance, new KeyConverter());
- OnDiskIndex b = new OnDiskIndex(indexB, LongType.instance, new KeyConverter());
+ OnDiskIndex a = new OnDiskIndex(indexA, LongType.instance, KeyConverter.instance);
+ OnDiskIndex b = new OnDiskIndex(indexB, LongType.instance, KeyConverter.instance);
RangeIterator<OnDiskIndex.DataTerm, CombinedTerm> union = OnDiskIndexIterator.union(a, b);
- TreeMap<Long, TreeMap<Long, LongSet>> actual = new TreeMap<>();
+ TreeMap<Long, TreeMap<Long, KeyOffsets>> actual = new TreeMap<>();
while (union.hasNext())
{
CombinedTerm term = union.next();
Long composedTerm = LongType.instance.compose(term.getTerm());
- TreeMap<Long, LongSet> offsets = actual.get(composedTerm);
+ TreeMap<Long, KeyOffsets> offsets = actual.get(composedTerm);
if (offsets == null)
actual.put(composedTerm, (offsets = new TreeMap<>()));
@@ -688,7 +697,7 @@ public class OnDiskIndexTest
OnDiskIndexBuilder combined = new OnDiskIndexBuilder(UTF8Type.instance, LongType.instance, OnDiskIndexBuilder.Mode.PREFIX);
combined.finish(Pair.create(keyAt(0).getKey(), keyAt(100).getKey()), indexC, new CombinedTermIterator(a, b));
- OnDiskIndex c = new OnDiskIndex(indexC, LongType.instance, new KeyConverter());
+ OnDiskIndex c = new OnDiskIndex(indexC, LongType.instance, KeyConverter.instance);
union = OnDiskIndexIterator.union(c);
actual.clear();
@@ -698,7 +707,7 @@ public class OnDiskIndexTest
Long composedTerm = LongType.instance.compose(term.getTerm());
- TreeMap<Long, LongSet> offsets = actual.get(composedTerm);
+ TreeMap<Long, KeyOffsets> offsets = actual.get(composedTerm);
if (offsets == null)
actual.put(composedTerm, (offsets = new TreeMap<>()));
@@ -738,7 +747,7 @@ public class OnDiskIndexTest
builder.finish(index);
- OnDiskIndex onDisk = new OnDiskIndex(index, UTF8Type.instance, new KeyConverter());
+ OnDiskIndex onDisk = new OnDiskIndex(index, UTF8Type.instance, KeyConverter.instance);
// check that lady% return lady gaga (1) and lady pank (3) but not lady of bells(2)
Assert.assertEquals(convert(1, 3), convert(onDisk.search(expressionFor("lady", Operator.LIKE_PREFIX))));
@@ -762,7 +771,7 @@ public class OnDiskIndexTest
while (tokens.hasNext())
{
Token token = tokens.next();
- Iterator<DecoratedKey> keys = token.iterator();
+ Iterator<RowKey> keys = token.iterator();
// each of the values should have exactly a single key
Assert.assertTrue(keys.hasNext());
@@ -771,7 +780,7 @@ public class OnDiskIndexTest
// and it's last should always smaller than current
if (lastToken != null)
- Assert.assertTrue("last should be less than current", lastToken.compareTo(token.get()) < 0);
+ Assert.assertTrue("last should be less than current", lastToken < token.get());
lastToken = token.get();
keyCount++;
@@ -780,61 +789,84 @@ public class OnDiskIndexTest
Assert.assertEquals(end - start, keyCount);
}
- private static DecoratedKey keyAt(long rawKey)
+ private static DecoratedKey keyAt(long partitionOffset)
+ {
+ return KeyConverter.dk(partitionOffset);
+ }
+
+ private static Clustering ck(long rowOffset)
+ {
+ return KeyConverter.ck(rowOffset);
+ }
+
+ private TokenTreeBuilder keyBuilder(long... offsets)
{
- ByteBuffer key = ByteBuffer.wrap(("key" + rawKey).getBytes());
- return new BufferDecoratedKey(new Murmur3Partitioner.LongToken(MurmurHash.hash2_64(key, key.position(), key.remaining(), 0)), key);
+ TokenTreeBuilder builder = new DynamicTokenTreeBuilder();
+
+ for (final long pkOffset : offsets)
+ {
+ DecoratedKey k = keyAt(pkOffset);
+ builder.add((Long) k.getToken().getTokenValue(), pkOffset, clusteringOffset(pkOffset));
+ }
+
+ return builder.finish();
}
- private static TokenTreeBuilder keyBuilder(Long... keys)
+ private static long clusteringOffset(long offset)
+ {
+ return offset + 100;
+ }
+
+ private TokenTreeBuilder keyBuilder(Pair<Long, Long>... offsets)
{
TokenTreeBuilder builder = new DynamicTokenTreeBuilder();
- for (final Long key : keys)
+ for (final Pair<Long,Long> key : offsets)
{
- DecoratedKey dk = keyAt(key);
- builder.add((Long) dk.getToken().getTokenValue(), key);
+ DecoratedKey k = keyAt(key.left);
+ builder.add((Long) k.getToken().getTokenValue(), key.left, key.right);
}
return builder.finish();
}
- private static Set<DecoratedKey> convert(TokenTreeBuilder offsets)
+ private static final ClusteringComparator CLUSTERING_COMPARATOR = new ClusteringComparator(BytesType.instance);
+
+ private static Set<RowKey> convert(TokenTreeBuilder offsets)
{
- Set<DecoratedKey> result = new HashSet<>();
+ Set<RowKey> result = new HashSet<>();
- Iterator<Pair<Long, LongSet>> offsetIter = offsets.iterator();
+ Iterator<Pair<Long, KeyOffsets>> offsetIter = offsets.iterator();
while (offsetIter.hasNext())
{
- LongSet v = offsetIter.next().right;
+ Pair<Long, KeyOffsets> pair = offsetIter.next();
- for (LongCursor offset : v)
- result.add(keyAt(offset.value));
+ for (LongObjectCursor<long[]> cursor : pair.right)
+ for (long l : cursor.value)
+ result.add(new RowKey(keyAt(cursor.key), ck(l), CLUSTERING_COMPARATOR));
}
return result;
}
- private static Set<DecoratedKey> convert(long... keyOffsets)
+ private static Set<RowKey> convert(long... keyOffsets)
{
- Set<DecoratedKey> result = new HashSet<>();
- for (long offset : keyOffsets)
- result.add(keyAt(offset));
+ Set<RowKey> result = new HashSet<>();
+ for (final long offset : keyOffsets)
+ result.add(new RowKey(keyAt(offset), ck(clusteringOffset(offset)), CLUSTERING_COMPARATOR));
return result;
}
- private static Set<DecoratedKey> convert(RangeIterator<Long, Token> results)
+ private static Set<RowKey> convert(RangeIterator<Long, Token> results)
{
if (results == null)
return Collections.emptySet();
- Set<DecoratedKey> keys = new TreeSet<>(DecoratedKey.comparator);
+ Set<RowKey> keys = new TreeSet<>();
while (results.hasNext())
- {
- for (DecoratedKey key : results.next())
+ for (RowKey key: results.next())
keys.add(key);
- }
return keys;
}
@@ -908,19 +940,11 @@ public class OnDiskIndexTest
private static void addAll(OnDiskIndexBuilder builder, ByteBuffer term, TokenTreeBuilder tokens)
{
- for (Pair<Long, LongSet> token : tokens)
- {
- for (long position : token.right.toArray())
- builder.add(term, keyAt(position), position);
- }
- }
-
- private static class KeyConverter implements Function<Long, DecoratedKey>
- {
- @Override
- public DecoratedKey apply(Long offset)
+ for (Pair<Long, KeyOffsets> token : tokens)
{
- return keyAt(offset);
+ for (LongObjectCursor<long[]> cursor : token.right)
+ for (long clusteringOffset : cursor.value)
+ builder.add(term, keyAt(cursor.key), cursor.key, clusteringOffset);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/test/unit/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriterTest.java b/test/unit/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriterTest.java
index f19d962..61e4d67 100644
--- a/test/unit/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriterTest.java
+++ b/test/unit/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriterTest.java
@@ -22,11 +22,14 @@ import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Supplier;
+import com.carrotsearch.hppc.cursors.LongObjectCursor;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.ClusteringComparator;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Keyspace;
@@ -35,6 +38,9 @@ import org.apache.cassandra.db.marshal.LongType;
import org.apache.cassandra.db.rows.BTreeRow;
import org.apache.cassandra.db.rows.BufferCell;
import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.index.sasi.KeyFetcher;
import org.apache.cassandra.index.sasi.SASIIndex;
import org.apache.cassandra.index.sasi.utils.RangeIterator;
import org.apache.cassandra.db.marshal.Int32Type;
@@ -70,6 +76,8 @@ public class PerSSTableIndexWriterTest extends SchemaLoader
Tables.of(SchemaLoader.sasiCFMD(KS_NAME, CF_NAME))));
}
+ private static final ClusteringComparator CLUSTERING_COMPARATOR = new ClusteringComparator(LongType.instance);
+
@Test
public void testPartialIndexWrites() throws Exception
{
@@ -86,19 +94,20 @@ public class PerSSTableIndexWriterTest extends SchemaLoader
Descriptor descriptor = Descriptor.fromFilename(cfs.getSSTablePath(directory));
PerSSTableIndexWriter indexWriter = (PerSSTableIndexWriter) sasi.getFlushObserver(descriptor, OperationType.FLUSH);
- SortedMap<DecoratedKey, Row> expectedKeys = new TreeMap<>(DecoratedKey.comparator);
+ SortedMap<RowKey, Row> expectedKeys = new TreeMap<>();
for (int i = 0; i < maxKeys; i++)
{
ByteBuffer key = ByteBufferUtil.bytes(String.format(keyFormat, i));
- expectedKeys.put(cfs.metadata.partitioner.decorateKey(key),
- BTreeRow.singleCellRow(Clustering.EMPTY,
+ Clustering clustering = Clustering.make(ByteBufferUtil.bytes(i * 1L));
+ expectedKeys.put(new RowKey(cfs.metadata.partitioner.decorateKey(key), clustering, CLUSTERING_COMPARATOR),
+ BTreeRow.singleCellRow(clustering,
BufferCell.live(column, timestamp, Int32Type.instance.decompose(i))));
}
indexWriter.begin();
- Iterator<Map.Entry<DecoratedKey, Row>> keyIterator = expectedKeys.entrySet().iterator();
+ Iterator<Map.Entry<RowKey, Row>> keyIterator = expectedKeys.entrySet().iterator();
long position = 0;
Set<String> segments = new HashSet<>();
@@ -110,10 +119,11 @@ public class PerSSTableIndexWriterTest extends SchemaLoader
if (!keyIterator.hasNext())
break outer;
- Map.Entry<DecoratedKey, Row> key = keyIterator.next();
+ Map.Entry<RowKey, Row> key = keyIterator.next();
- indexWriter.startPartition(key.getKey(), position++);
- indexWriter.nextUnfilteredCluster(key.getValue());
+ indexWriter.startPartition(key.getKey().decoratedKey, position);
+ indexWriter.nextUnfilteredCluster(key.getValue(), position);
+ position++;
}
PerSSTableIndexWriter.Index index = indexWriter.getIndex(column);
@@ -134,15 +144,12 @@ public class PerSSTableIndexWriterTest extends SchemaLoader
for (String segment : segments)
Assert.assertFalse(new File(segment).exists());
- OnDiskIndex index = new OnDiskIndex(new File(indexFile), Int32Type.instance, keyPosition -> {
- ByteBuffer key = ByteBufferUtil.bytes(String.format(keyFormat, keyPosition));
- return cfs.metadata.partitioner.decorateKey(key);
- });
+ OnDiskIndex index = new OnDiskIndex(new File(indexFile), Int32Type.instance, new FakeKeyFetcher(cfs, keyFormat));
Assert.assertEquals(0, UTF8Type.instance.compare(index.minKey(), ByteBufferUtil.bytes(String.format(keyFormat, 0))));
Assert.assertEquals(0, UTF8Type.instance.compare(index.maxKey(), ByteBufferUtil.bytes(String.format(keyFormat, maxKeys - 1))));
- Set<DecoratedKey> actualKeys = new HashSet<>();
+ Set<RowKey> actualKeys = new HashSet<>();
int count = 0;
for (OnDiskIndex.DataTerm term : index)
{
@@ -150,7 +157,7 @@ public class PerSSTableIndexWriterTest extends SchemaLoader
while (tokens.hasNext())
{
- for (DecoratedKey key : tokens.next())
+ for (RowKey key : tokens.next())
actualKeys.add(key);
}
@@ -158,8 +165,8 @@ public class PerSSTableIndexWriterTest extends SchemaLoader
}
Assert.assertEquals(expectedKeys.size(), actualKeys.size());
- for (DecoratedKey key : expectedKeys.keySet())
- Assert.assertTrue(actualKeys.contains(key));
+ for (RowKey key : expectedKeys.keySet())
+ Assert.assertTrue("Key was not present : " + key, actualKeys.contains(key));
FileUtils.closeQuietly(index);
}
@@ -183,11 +190,14 @@ public class PerSSTableIndexWriterTest extends SchemaLoader
indexWriter.begin();
indexWriter.indexes.put(column, indexWriter.newIndex(sasi.getIndex()));
- populateSegment(cfs.metadata, indexWriter.getIndex(column), new HashMap<Long, Set<Integer>>()
+ populateSegment(cfs.metadata, indexWriter.getIndex(column), new HashMap<Long, KeyOffsets>()
{{
- put(now, new HashSet<>(Arrays.asList(0, 1)));
- put(now + 1, new HashSet<>(Arrays.asList(2, 3)));
- put(now + 2, new HashSet<>(Arrays.asList(4, 5, 6, 7, 8, 9)));
+ put(now, new KeyOffsets() {{ put(0, 0); put(1, 1); }});
+ put(now + 1, new KeyOffsets() {{ put(2, 2); put(3, 3); }});
+ put(now + 2, new KeyOffsets() {{
+ put(4, 4); put(5, 5); put(6, 6);
+ put(7, 7); put(8, 8); put(9, 9);
+ }});
}});
Callable<OnDiskIndex> segmentBuilder = indexWriter.getIndex(column).scheduleSegmentFlush(false);
@@ -197,15 +207,21 @@ public class PerSSTableIndexWriterTest extends SchemaLoader
PerSSTableIndexWriter.Index index = indexWriter.getIndex(column);
Random random = ThreadLocalRandom.current();
+ Supplier<KeyOffsets> offsetSupplier = () -> new KeyOffsets() {{
+ put(random.nextInt(), random.nextInt());
+ put(random.nextInt(), random.nextInt());
+ put(random.nextInt(), random.nextInt());
+ }};
+
Set<String> segments = new HashSet<>();
// now let's test multiple correct segments with yield incorrect final segment
for (int i = 0; i < 3; i++)
{
- populateSegment(cfs.metadata, index, new HashMap<Long, Set<Integer>>()
+ populateSegment(cfs.metadata, index, new HashMap<Long, KeyOffsets>()
{{
- put(now, new HashSet<>(Arrays.asList(random.nextInt(), random.nextInt(), random.nextInt())));
- put(now + 1, new HashSet<>(Arrays.asList(random.nextInt(), random.nextInt(), random.nextInt())));
- put(now + 2, new HashSet<>(Arrays.asList(random.nextInt(), random.nextInt(), random.nextInt())));
+ put(now, offsetSupplier.get());
+ put(now + 1, offsetSupplier.get());
+ put(now + 2, offsetSupplier.get());
}});
try
@@ -236,16 +252,56 @@ public class PerSSTableIndexWriterTest extends SchemaLoader
Assert.assertFalse(new File(index.outputFile).exists());
}
- private static void populateSegment(CFMetaData metadata, PerSSTableIndexWriter.Index index, Map<Long, Set<Integer>> data)
+ private static void populateSegment(CFMetaData metadata, PerSSTableIndexWriter.Index index, Map<Long, KeyOffsets> data)
{
- for (Map.Entry<Long, Set<Integer>> value : data.entrySet())
+ for (Map.Entry<Long, KeyOffsets> value : data.entrySet())
{
ByteBuffer term = LongType.instance.decompose(value.getKey());
- for (Integer keyPos : value.getValue())
+ for (LongObjectCursor<long[]> cursor : value.getValue())
{
- ByteBuffer key = ByteBufferUtil.bytes(String.format("key%06d", keyPos));
- index.add(term, metadata.partitioner.decorateKey(key), ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE - 1));
+ ByteBuffer key = ByteBufferUtil.bytes(String.format("key%06d", cursor.key));
+ for (long rowOffset : cursor.value)
+ {
+ index.add(term,
+ metadata.partitioner.decorateKey(key),
+ ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE - 1),
+ ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE - 1));
+ }
}
}
}
+
+ private final class FakeKeyFetcher implements KeyFetcher
+ {
+ private final ColumnFamilyStore cfs;
+ private final String keyFormat;
+
+ public FakeKeyFetcher(ColumnFamilyStore cfs, String keyFormat)
+ {
+ this.cfs = cfs;
+ this.keyFormat = keyFormat;
+ }
+
+ public DecoratedKey getPartitionKey(long keyPosition)
+ {
+ ByteBuffer key = ByteBufferUtil.bytes(String.format(keyFormat, keyPosition));
+ return cfs.metadata.partitioner.decorateKey(key);
+ }
+
+ public Clustering getClustering(long offset)
+ {
+ return Clustering.make(ByteBufferUtil.bytes(offset));
+ }
+
+ public RowKey getRowKey(long partitionOffset, long rowOffset)
+ {
+ return new RowKey(getPartitionKey(partitionOffset), getClustering(rowOffset), CLUSTERING_COMPARATOR);
+ }
+ }
+
+ public IPartitioner getPartitioner()
+ {
+ return Murmur3Partitioner.instance;
+ }
+
}
[3/3] cassandra git commit: Add row offset support to SASI
Posted by xe...@apache.org.
Add row offset support to SASI
Patch by Alex Petrov; reviewed by Pavel Yaskevich for CASSANDRA-11990
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7d857b46
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7d857b46
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7d857b46
Branch: refs/heads/trunk
Commit: 7d857b46fb070548bf5e5f6ff81db588f08ec22a
Parents: 3c95d47
Author: Alex Petrov <ol...@gmail.com>
Authored: Sun Jun 26 18:21:08 2016 +0200
Committer: Pavel Yaskevich <xe...@apache.org>
Committed: Mon Sep 5 22:17:11 2016 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/ColumnIndex.java | 6 +-
.../apache/cassandra/index/sasi/KeyFetcher.java | 98 +++++++
.../apache/cassandra/index/sasi/SASIIndex.java | 11 +-
.../cassandra/index/sasi/SASIIndexBuilder.java | 13 +-
.../cassandra/index/sasi/SSTableIndex.java | 41 +--
.../cassandra/index/sasi/conf/ColumnIndex.java | 4 +-
.../index/sasi/conf/view/RangeTermTree.java | 4 +
.../sasi/disk/AbstractTokenTreeBuilder.java | 276 ++++++++++--------
.../cassandra/index/sasi/disk/Descriptor.java | 33 ++-
.../sasi/disk/DynamicTokenTreeBuilder.java | 59 ++--
.../cassandra/index/sasi/disk/KeyOffsets.java | 115 ++++++++
.../cassandra/index/sasi/disk/OnDiskIndex.java | 12 +-
.../index/sasi/disk/OnDiskIndexBuilder.java | 16 +-
.../index/sasi/disk/PerSSTableIndexWriter.java | 13 +-
.../cassandra/index/sasi/disk/RowKey.java | 108 +++++++
.../index/sasi/disk/StaticTokenTreeBuilder.java | 18 +-
.../apache/cassandra/index/sasi/disk/Token.java | 10 +-
.../cassandra/index/sasi/disk/TokenTree.java | 288 ++++++++++++-------
.../index/sasi/disk/TokenTreeBuilder.java | 72 +++--
.../index/sasi/memory/IndexMemtable.java | 8 +-
.../index/sasi/memory/KeyRangeIterator.java | 49 ++--
.../cassandra/index/sasi/memory/MemIndex.java | 4 +-
.../index/sasi/memory/SkipListMemIndex.java | 12 +-
.../index/sasi/memory/TrieMemIndex.java | 45 ++-
.../index/sasi/plan/QueryController.java | 49 ++--
.../cassandra/index/sasi/plan/QueryPlan.java | 174 ++++++++---
.../io/sstable/format/SSTableFlushObserver.java | 5 +
.../io/sstable/format/SSTableReader.java | 33 ++-
.../io/sstable/format/big/BigTableWriter.java | 8 +-
.../org/apache/cassandra/utils/obs/BitUtil.java | 2 +-
test/data/legacy-sasi/on-disk-sa-int2.db | Bin 0 -> 12312 bytes
.../cassandra/index/sasi/SASIIndexTest.java | 25 +-
.../index/sasi/disk/KeyOffsetsTest.java | 48 ++++
.../index/sasi/disk/OnDiskIndexTest.java | 216 +++++++-------
.../sasi/disk/PerSSTableIndexWriterTest.java | 112 ++++++--
.../index/sasi/disk/TokenTreeTest.java | 208 +++++++-------
.../index/sasi/plan/OperationTest.java | 2 +-
.../index/sasi/utils/KeyConverter.java | 69 +++++
.../index/sasi/utils/LongIterator.java | 8 +-
.../sasi/utils/RangeUnionIteratorTest.java | 17 ++
41 files changed, 1547 insertions(+), 745 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c466dfe..28c2d84 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.10
+ * Add row offset support to SASI (CASSANDRA-11990)
* Cleanup uses of AlterTableStatementColumn (CASSANDRA-12567)
* Add keep-alive to streaming (CASSANDRA-11841)
* Tracing payload is passed through newSession(..) (CASSANDRA-11706)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java
index 1116cc2..1f39927 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -121,9 +121,10 @@ public class ColumnIndex
{
Row staticRow = iterator.staticRow();
+ long startPosition = currentPosition();
UnfilteredSerializer.serializer.serializeStaticRow(staticRow, header, writer, version);
if (!observers.isEmpty())
- observers.forEach((o) -> o.nextUnfilteredCluster(staticRow));
+ observers.forEach((o) -> o.nextUnfilteredCluster(staticRow, startPosition));
}
}
@@ -232,6 +233,7 @@ public class ColumnIndex
private void add(Unfiltered unfiltered) throws IOException
{
+ final long origPos = writer.position();
long pos = currentPosition();
if (firstClustering == null)
@@ -245,7 +247,7 @@ public class ColumnIndex
// notify observers about each new row
if (!observers.isEmpty())
- observers.forEach((o) -> o.nextUnfilteredCluster(unfiltered));
+ observers.forEach((o) -> o.nextUnfilteredCluster(unfiltered, origPos));
lastClustering = unfiltered.clustering();
previousRowStart = pos;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/KeyFetcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/KeyFetcher.java b/src/java/org/apache/cassandra/index/sasi/KeyFetcher.java
new file mode 100644
index 0000000..80ee167
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/KeyFetcher.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.index.sasi;
+
+import java.io.IOException;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.index.sasi.disk.*;
+import org.apache.cassandra.io.*;
+import org.apache.cassandra.io.sstable.format.*;
+
+
+public interface KeyFetcher
+{
+ public Clustering getClustering(long offset);
+ public DecoratedKey getPartitionKey(long offset);
+
+ public RowKey getRowKey(long partitionOffset, long rowOffset);
+
+ /**
+ * Fetches clustering and partition key from the sstable.
+ *
+ * Currently, clustering key is fetched from the data file of the sstable and partition key is
+ * read from the index file. Reading from index file helps us to warm up key cache in this case.
+ */
+ public static class SSTableKeyFetcher implements KeyFetcher
+ {
+ private final SSTableReader sstable;
+
+ public SSTableKeyFetcher(SSTableReader reader)
+ {
+ sstable = reader;
+ }
+
+ @Override
+ public Clustering getClustering(long offset)
+ {
+ try
+ {
+ return sstable.clusteringAt(offset);
+ }
+ catch (IOException e)
+ {
+ throw new FSReadError(new IOException("Failed to read clustering from " + sstable.descriptor, e), sstable.getFilename());
+ }
+ }
+
+ @Override
+ public DecoratedKey getPartitionKey(long offset)
+ {
+ try
+ {
+ return sstable.keyAt(offset);
+ }
+ catch (IOException e)
+ {
+ throw new FSReadError(new IOException("Failed to read key from " + sstable.descriptor, e), sstable.getFilename());
+ }
+ }
+
+ @Override
+ public RowKey getRowKey(long partitionOffset, long rowOffset)
+ {
+ if (rowOffset == KeyOffsets.NO_OFFSET)
+ return new RowKey(getPartitionKey(partitionOffset), null, sstable.metadata.comparator);
+ else
+ return new RowKey(getPartitionKey(partitionOffset), getClustering(rowOffset), sstable.metadata.comparator);
+ }
+
+ public int hashCode()
+ {
+ return sstable.descriptor.hashCode();
+ }
+
+ public boolean equals(Object other)
+ {
+ return other instanceof SSTableKeyFetcher
+ && sstable.descriptor.equals(((SSTableKeyFetcher) other).sstable.descriptor);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/SASIIndex.java b/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
index 4375964..65953a9 100644
--- a/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
+++ b/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
@@ -46,6 +46,7 @@ import org.apache.cassandra.index.sasi.conf.ColumnIndex;
import org.apache.cassandra.index.sasi.conf.IndexMode;
import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder.Mode;
import org.apache.cassandra.index.sasi.disk.PerSSTableIndexWriter;
+import org.apache.cassandra.index.sasi.disk.RowKey;
import org.apache.cassandra.index.sasi.plan.QueryPlan;
import org.apache.cassandra.index.transactions.IndexTransaction;
import org.apache.cassandra.io.sstable.Descriptor;
@@ -182,6 +183,14 @@ public class SASIIndex implements Index, INotificationConsumer
return getTruncateTask(FBUtilities.timestampMicros());
}
+ public Callable<?> getTruncateTask(Collection<SSTableReader> sstablesToRebuild)
+ {
+ return () -> {
+ index.dropData(sstablesToRebuild);
+ return null;
+ };
+ }
+
public Callable<?> getTruncateTask(long truncatedAt)
{
return () -> {
@@ -252,7 +261,7 @@ public class SASIIndex implements Index, INotificationConsumer
public void insertRow(Row row)
{
if (isNewData())
- adjustMemtableSize(index.index(key, row), opGroup);
+ adjustMemtableSize(index.index(new RowKey(key, row.clustering(), baseCfs.getComparator()), row), opGroup);
}
public void updateRow(Row oldRow, Row newRow)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java b/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java
index d50875a..d6706ea 100644
--- a/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java
+++ b/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java
@@ -94,16 +94,23 @@ class SASIIndexBuilder extends SecondaryIndexBuilder
{
RowIndexEntry indexEntry = sstable.getPosition(key, SSTableReader.Operator.EQ);
dataFile.seek(indexEntry.position);
- ByteBufferUtil.readWithShortLength(dataFile); // key
+ int staticOffset = ByteBufferUtil.readWithShortLength(dataFile).remaining(); // key
try (SSTableIdentityIterator partition = SSTableIdentityIterator.create(sstable, dataFile, key))
{
// if the row has statics attached, it has to be indexed separately
if (cfs.metadata.hasStaticColumns())
- indexWriter.nextUnfilteredCluster(partition.staticRow());
+ {
+ long staticPosition = indexEntry.position + staticOffset;
+ indexWriter.nextUnfilteredCluster(partition.staticRow(), staticPosition);
+ }
+ long position = dataFile.getPosition();
while (partition.hasNext())
- indexWriter.nextUnfilteredCluster(partition.next());
+ {
+ indexWriter.nextUnfilteredCluster(partition.next(), position);
+ position = dataFile.getPosition();
+ }
}
}
catch (IOException ex)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/SSTableIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/SSTableIndex.java b/src/java/org/apache/cassandra/index/sasi/SSTableIndex.java
index c67c39c..f9c8abf 100644
--- a/src/java/org/apache/cassandra/index/sasi/SSTableIndex.java
+++ b/src/java/org/apache/cassandra/index/sasi/SSTableIndex.java
@@ -18,28 +18,22 @@
package org.apache.cassandra.index.sasi;
import java.io.File;
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.index.sasi.conf.ColumnIndex;
-import org.apache.cassandra.index.sasi.disk.OnDiskIndex;
-import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder;
+import org.apache.cassandra.index.sasi.disk.*;
import org.apache.cassandra.index.sasi.disk.Token;
import org.apache.cassandra.index.sasi.plan.Expression;
import org.apache.cassandra.index.sasi.utils.RangeIterator;
-import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.concurrent.Ref;
import org.apache.commons.lang3.builder.HashCodeBuilder;
-import com.google.common.base.Function;
-
public class SSTableIndex
{
private final ColumnIndex columnIndex;
@@ -65,7 +59,7 @@ public class SSTableIndex
sstable.getFilename(),
columnIndex.getIndexName());
- this.index = new OnDiskIndex(indexFile, validator, new DecoratedKeyFetcher(sstable));
+ this.index = new OnDiskIndex(indexFile, validator, new KeyFetcher.SSTableKeyFetcher(sstable));
}
public OnDiskIndexBuilder.Mode mode()
@@ -163,36 +157,5 @@ public class SSTableIndex
return String.format("SSTableIndex(column: %s, SSTable: %s)", columnIndex.getColumnName(), sstable.descriptor);
}
- private static class DecoratedKeyFetcher implements Function<Long, DecoratedKey>
- {
- private final SSTableReader sstable;
-
- DecoratedKeyFetcher(SSTableReader reader)
- {
- sstable = reader;
- }
-
- public DecoratedKey apply(Long offset)
- {
- try
- {
- return sstable.keyAt(offset);
- }
- catch (IOException e)
- {
- throw new FSReadError(new IOException("Failed to read key from " + sstable.descriptor, e), sstable.getFilename());
- }
- }
-
- public int hashCode()
- {
- return sstable.descriptor.hashCode();
- }
- public boolean equals(Object other)
- {
- return other instanceof DecoratedKeyFetcher
- && sstable.descriptor.equals(((DecoratedKeyFetcher) other).sstable.descriptor);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java b/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java
index 0958113..459e5c3 100644
--- a/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java
@@ -30,7 +30,6 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.cql3.Operator;
-import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Memtable;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.AsciiType;
@@ -40,6 +39,7 @@ import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.index.sasi.analyzer.AbstractAnalyzer;
import org.apache.cassandra.index.sasi.conf.view.View;
import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder;
+import org.apache.cassandra.index.sasi.disk.RowKey;
import org.apache.cassandra.index.sasi.disk.Token;
import org.apache.cassandra.index.sasi.memory.IndexMemtable;
import org.apache.cassandra.index.sasi.plan.Expression;
@@ -99,7 +99,7 @@ public class ColumnIndex
return keyValidator;
}
- public long index(DecoratedKey key, Row row)
+ public long index(RowKey key, Row row)
{
return getCurrentMemtable().index(key, getValueOf(column, row, FBUtilities.nowInSeconds()));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/conf/view/RangeTermTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/conf/view/RangeTermTree.java b/src/java/org/apache/cassandra/index/sasi/conf/view/RangeTermTree.java
index d6b4551..2775c29 100644
--- a/src/java/org/apache/cassandra/index/sasi/conf/view/RangeTermTree.java
+++ b/src/java/org/apache/cassandra/index/sasi/conf/view/RangeTermTree.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.index.sasi.conf.view;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -46,6 +47,9 @@ public class RangeTermTree implements TermTree
public Set<SSTableIndex> search(Expression e)
{
+ if (e == null)
+ return Collections.emptySet();
+
ByteBuffer minTerm = e.lower == null ? min : e.lower.value;
ByteBuffer maxTerm = e.upper == null ? max : e.upper.value;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/disk/AbstractTokenTreeBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/AbstractTokenTreeBuilder.java b/src/java/org/apache/cassandra/index/sasi/disk/AbstractTokenTreeBuilder.java
index 18994de..9245960 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/AbstractTokenTreeBuilder.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/AbstractTokenTreeBuilder.java
@@ -20,19 +20,18 @@ package org.apache.cassandra.index.sasi.disk;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
+import java.util.*;
+import java.util.function.*;
+import com.carrotsearch.hppc.LongArrayList;
+import com.carrotsearch.hppc.cursors.LongCursor;
+import com.carrotsearch.hppc.cursors.LongObjectCursor;
+import org.apache.cassandra.dht.*;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.utils.AbstractIterator;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
-import com.carrotsearch.hppc.LongArrayList;
-import com.carrotsearch.hppc.LongSet;
-import com.carrotsearch.hppc.cursors.LongCursor;
-
public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
{
protected int numBlocks;
@@ -65,7 +64,7 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
public int serializedSize()
{
if (numBlocks == 1)
- return (BLOCK_HEADER_BYTES + ((int) tokenCount * 16));
+ return BLOCK_HEADER_BYTES + ((int) tokenCount * LEAF_ENTRY_BYTES);
else
return numBlocks * BLOCK_BYTES;
}
@@ -112,6 +111,15 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
buffer.clear();
}
+ /**
+ * Tree node,
+ *
+ * B+-tree consists of root, interior nodes and leaves. Root can be either a node or a leaf.
+ *
+ * Depending on the concrete implementation of {@code TokenTreeBuilder}
+ * leaf can be partial or static (in case of {@code StaticTokenTreeBuilder} or dynamic in case
+ * of {@code DynamicTokenTreeBuilder}
+ */
protected abstract class Node
{
protected InteriorNode parent;
@@ -179,8 +187,16 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
alignBuffer(buf, BLOCK_HEADER_BYTES);
}
+ /**
+ * Shared header part, written for all node types:
+ * [ info byte ] [ token count ] [ min node token ] [ max node token ]
+ * [ 1b ] [ 2b (short) ] [ 8b (long) ] [ 8b (long) ]
+ **/
private abstract class Header
{
+ /**
+ * Serializes the shared part of the header
+ */
public void serialize(ByteBuffer buf)
{
buf.put(infoByte())
@@ -192,6 +208,12 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
protected abstract byte infoByte();
}
+ /**
+ * In addition to shared header part, root header stores version information,
+ * overall token count and min/max tokens for the whole tree:
+ * [ magic ] [ overall token count ] [ min tree token ] [ max tree token ]
+ * [ 2b (short) ] [ 8b (long) ] [ 8b (long) ] [ 8b (long) ]
+ */
private class RootHeader extends Header
{
public void serialize(ByteBuffer buf)
@@ -207,19 +229,21 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
{
// if leaf, set leaf indicator and last leaf indicator (bits 0 & 1)
// if not leaf, clear both bits
- return (byte) ((isLeaf()) ? 3 : 0);
+ return isLeaf() ? ENTRY_TYPE_MASK : 0;
}
protected void writeMagic(ByteBuffer buf)
{
switch (Descriptor.CURRENT_VERSION)
{
- case Descriptor.VERSION_AB:
+ case ab:
buf.putShort(AB_MAGIC);
break;
-
- default:
+ case ac:
+ buf.putShort(AC_MAGIC);
break;
+ default:
+ throw new RuntimeException("Unsupported version");
}
}
@@ -249,6 +273,12 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
}
+ /**
+ * Leaf consists of
+ * - header (format described in {@code Header} )
+ * - data (format described in {@code LeafEntry})
+ * - overflow collision entries, that hold {@value OVERFLOW_TRAILER_CAPACITY} of {@code RowOffset}.
+ */
protected abstract class Leaf extends Node
{
protected LongArrayList overflowCollisions;
@@ -279,82 +309,98 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
protected abstract void serializeData(ByteBuffer buf);
- protected LeafEntry createEntry(final long tok, final LongSet offsets)
+ protected LeafEntry createEntry(final long tok, final KeyOffsets offsets)
{
- int offsetCount = offsets.size();
+ LongArrayList rawOffsets = new LongArrayList(offsets.size());
+
+ offsets.forEach(new Consumer<LongObjectCursor<long[]>>()
+ {
+ public void accept(LongObjectCursor<long[]> cursor)
+ {
+ for (long l : cursor.value)
+ {
+ rawOffsets.add(cursor.key);
+ rawOffsets.add(l);
+ }
+ }
+ });
+
+ int offsetCount = rawOffsets.size();
switch (offsetCount)
{
case 0:
throw new AssertionError("no offsets for token " + tok);
- case 1:
- long offset = offsets.toArray()[0];
- if (offset > MAX_OFFSET)
- throw new AssertionError("offset " + offset + " cannot be greater than " + MAX_OFFSET);
- else if (offset <= Integer.MAX_VALUE)
- return new SimpleLeafEntry(tok, offset);
- else
- return new FactoredOffsetLeafEntry(tok, offset);
case 2:
- long[] rawOffsets = offsets.toArray();
- if (rawOffsets[0] <= Integer.MAX_VALUE && rawOffsets[1] <= Integer.MAX_VALUE &&
- (rawOffsets[0] <= Short.MAX_VALUE || rawOffsets[1] <= Short.MAX_VALUE))
- return new PackedCollisionLeafEntry(tok, rawOffsets);
- else
- return createOverflowEntry(tok, offsetCount, offsets);
+ return new SimpleLeafEntry(tok, rawOffsets.get(0), rawOffsets.get(1));
default:
- return createOverflowEntry(tok, offsetCount, offsets);
+ assert offsetCount % 2 == 0;
+ if (offsetCount == 4)
+ {
+ if (rawOffsets.get(0) < Integer.MAX_VALUE && rawOffsets.get(1) < Integer.MAX_VALUE &&
+ rawOffsets.get(2) < Integer.MAX_VALUE && rawOffsets.get(3) < Integer.MAX_VALUE)
+ {
+ return new PackedCollisionLeafEntry(tok, (int)rawOffsets.get(0), (int) rawOffsets.get(1),
+ (int) rawOffsets.get(2), (int) rawOffsets.get(3));
+ }
+ }
+ return createOverflowEntry(tok, offsetCount, rawOffsets);
}
}
- private LeafEntry createOverflowEntry(final long tok, final int offsetCount, final LongSet offsets)
+ private LeafEntry createOverflowEntry(final long tok, final int offsetCount, final LongArrayList offsets)
{
if (overflowCollisions == null)
- overflowCollisions = new LongArrayList();
+ overflowCollisions = new LongArrayList(offsetCount);
- LeafEntry entry = new OverflowCollisionLeafEntry(tok, (short) overflowCollisions.size(), (short) offsetCount);
- for (LongCursor o : offsets)
- {
- if (overflowCollisions.size() == OVERFLOW_TRAILER_CAPACITY)
- throw new AssertionError("cannot have more than " + OVERFLOW_TRAILER_CAPACITY + " overflow collisions per leaf");
- else
- overflowCollisions.add(o.value);
- }
+ int overflowCount = (overflowCollisions.size() + offsetCount) / 2;
+ if (overflowCount >= OVERFLOW_TRAILER_CAPACITY)
+ throw new AssertionError("cannot have more than " + OVERFLOW_TRAILER_CAPACITY + " overflow collisions per leaf, but had: " + overflowCount);
+
+ LeafEntry entry = new OverflowCollisionLeafEntry(tok, (short) (overflowCollisions.size() / 2), (short) (offsetCount / 2));
+ overflowCollisions.addAll(offsets);
return entry;
}
+ /**
+ * A leaf of the B+-Tree, that holds information about the row offset(s) for
+ * the current token.
+ *
+ * Main 3 types of leaf entries are:
+ * 1) simple leaf entry: holding just a single row offset
+ * 2) packed collision leaf entry: holding two entries that would fit together into 168 bytes
+ * 3) overflow entry: only holds offset in overflow trailer and amount of entries belonging to this leaf
+ */
protected abstract class LeafEntry
{
protected final long token;
abstract public EntryType type();
- abstract public int offsetData();
- abstract public short offsetExtra();
public LeafEntry(final long tok)
{
token = tok;
}
- public void serialize(ByteBuffer buf)
- {
- buf.putShort((short) type().ordinal())
- .putShort(offsetExtra())
- .putLong(token)
- .putInt(offsetData());
- }
+ public abstract void serialize(ByteBuffer buf);
}
-
- // assumes there is a single offset and the offset is <= Integer.MAX_VALUE
+ /**
+ * Simple leaf, that can store a single row offset, having the following format:
+ *
+ * [ type ] [ token ] [ partition offset ] [ row offset ]
+ * [ 2b (short) ] [ 8b (long) ] [ 8b (long) ] [ 8b (long) ]
+ */
protected class SimpleLeafEntry extends LeafEntry
{
- private final long offset;
+ private final long partitionOffset;
+ private final long rowOffset;
- public SimpleLeafEntry(final long tok, final long off)
+ public SimpleLeafEntry(final long tok, final long partitionOffset, final long rowOffset)
{
super(tok);
- offset = off;
+ this.partitionOffset = partitionOffset;
+ this.rowOffset = rowOffset;
}
public EntryType type()
@@ -362,61 +408,38 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
return EntryType.SIMPLE;
}
- public int offsetData()
- {
- return (int) offset;
- }
-
- public short offsetExtra()
- {
- return 0;
- }
- }
-
- // assumes there is a single offset and Integer.MAX_VALUE < offset <= MAX_OFFSET
- // take the middle 32 bits of offset (or the top 32 when considering offset is max 48 bits)
- // and store where offset is normally stored. take bottom 16 bits of offset and store in entry header
- private class FactoredOffsetLeafEntry extends LeafEntry
- {
- private final long offset;
-
- public FactoredOffsetLeafEntry(final long tok, final long off)
- {
- super(tok);
- offset = off;
- }
-
- public EntryType type()
- {
- return EntryType.FACTORED;
- }
-
- public int offsetData()
- {
- return (int) (offset >>> Short.SIZE);
- }
-
- public short offsetExtra()
+ @Override
+ public void serialize(ByteBuffer buf)
{
- // exta offset is supposed to be an unsigned 16-bit integer
- return (short) offset;
+ buf.putShort((short) type().ordinal())
+ .putLong(token)
+ .putLong(partitionOffset)
+ .putLong(rowOffset);
}
}
- // holds an entry with two offsets that can be packed in an int & a short
- // the int offset is stored where offset is normally stored. short offset is
- // stored in entry header
- private class PackedCollisionLeafEntry extends LeafEntry
+ /**
+ * Packed collision entry, can store two offsets, if each one of their positions
+ * fit into 4 bytes.
+ * [ type ] [ token ] [ partition offset 1 ] [ row offset 1] [ partition offset 1 ] [ row offset 1]
+ * [ 2b (short) ] [ 8b (long) ] [ 4b (int) ] [ 4b (int) ] [ 4b (int) ] [ 4b (int) ]
+ */
+ protected class PackedCollisionLeafEntry extends LeafEntry
{
- private short smallerOffset;
- private int largerOffset;
+ private final int partitionOffset1;
+ private final int rowOffset1;
+ private final int partitionOffset2;
+ private final int rowOffset2;
- public PackedCollisionLeafEntry(final long tok, final long[] offs)
+ public PackedCollisionLeafEntry(final long tok, final int partitionOffset1, final int rowOffset1,
+ final int partitionOffset2, final int rowOffset2)
{
super(tok);
+ this.partitionOffset1 = partitionOffset1;
+ this.rowOffset1 = rowOffset1;
+ this.partitionOffset2 = partitionOffset2;
+ this.rowOffset2 = rowOffset2;
- smallerOffset = (short) Math.min(offs[0], offs[1]);
- largerOffset = (int) Math.max(offs[0], offs[1]);
}
public EntryType type()
@@ -424,21 +447,27 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
return EntryType.PACKED;
}
- public int offsetData()
- {
- return largerOffset;
- }
-
- public short offsetExtra()
+ @Override
+ public void serialize(ByteBuffer buf)
{
- return smallerOffset;
- }
- }
-
- // holds an entry with three or more offsets, or two offsets that cannot
- // be packed into an int & a short. the index into the overflow list
- // is stored where the offset is normally stored. the number of overflowed offsets
- // for the entry is stored in the entry header
+ buf.putShort((short) type().ordinal())
+ .putLong(token)
+ .putInt(partitionOffset1)
+ .putInt(rowOffset1)
+ .putInt(partitionOffset2)
+ .putInt(rowOffset2);
+ }
+ }
+
+ /**
+ * Overflow collision entry, holds an entry with three or more offsets, or two offsets
+ * that cannot be packed into 16 bytes.
+ * [ type ] [ token ] [ start index ] [ count ]
+ * [ 2b (short) ] [ 8b (long) ] [ 8b (long) ] [ 8b (long) ]
+ *
+ * - [ start index ] is a position of first item belonging to this leaf entry in the overflow trailer
+ * - [ count ] is the amount of items belonging to this leaf entry that are stored in the overflow trailer
+ */
private class OverflowCollisionLeafEntry extends LeafEntry
{
private final short startIndex;
@@ -456,20 +485,23 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
return EntryType.OVERFLOW;
}
- public int offsetData()
- {
- return startIndex;
- }
-
- public short offsetExtra()
+ @Override
+ public void serialize(ByteBuffer buf)
{
- return count;
+ buf.putShort((short) type().ordinal())
+ .putLong(token)
+ .putLong(startIndex)
+ .putLong(count);
}
-
}
-
}
+ /**
+ * Interior node consists of:
+ * - (interior node) header
+ * - tokens (serialized as longs, with count stored in header)
+ * - child offsets
+ */
protected class InteriorNode extends Node
{
protected List<Long> tokens = new ArrayList<>(TOKENS_PER_BLOCK);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/disk/Descriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/Descriptor.java b/src/java/org/apache/cassandra/index/sasi/disk/Descriptor.java
index 3aa6f14..3fa0f06 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/Descriptor.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/Descriptor.java
@@ -22,30 +22,29 @@ package org.apache.cassandra.index.sasi.disk;
*/
public class Descriptor
{
- public static final String VERSION_AA = "aa";
- public static final String VERSION_AB = "ab";
- public static final String CURRENT_VERSION = VERSION_AB;
- public static final Descriptor CURRENT = new Descriptor(CURRENT_VERSION);
-
- public static class Version
+ public static enum Version
{
- public final String version;
+ aa,
+ ab,
+ ac
+ }
- public Version(String version)
- {
- this.version = version;
- }
+ public static final Version VERSION_AA = Version.aa;
+ public static final Version VERSION_AB = Version.ab;
+ public static final Version VERSION_AC = Version.ac;
- public String toString()
- {
- return version;
- }
- }
+ public static final Version CURRENT_VERSION = Version.ac;
+ public static final Descriptor CURRENT = new Descriptor(CURRENT_VERSION);
public final Version version;
public Descriptor(String v)
{
- this.version = new Version(v);
+ this.version = Version.valueOf(v);
+ }
+
+ public Descriptor(Version v)
+ {
+ this.version = v;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/disk/DynamicTokenTreeBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/DynamicTokenTreeBuilder.java b/src/java/org/apache/cassandra/index/sasi/disk/DynamicTokenTreeBuilder.java
index 2ddfd89..6e3e163 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/DynamicTokenTreeBuilder.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/DynamicTokenTreeBuilder.java
@@ -20,17 +20,14 @@ package org.apache.cassandra.index.sasi.disk;
import java.nio.ByteBuffer;
import java.util.*;
-import org.apache.cassandra.utils.AbstractIterator;
-import org.apache.cassandra.utils.Pair;
-
-import com.carrotsearch.hppc.LongOpenHashSet;
-import com.carrotsearch.hppc.LongSet;
-import com.carrotsearch.hppc.cursors.LongCursor;
+import com.carrotsearch.hppc.cursors.LongObjectCursor;
+import org.apache.cassandra.dht.*;
+import org.apache.cassandra.utils.*;
public class DynamicTokenTreeBuilder extends AbstractTokenTreeBuilder
{
- private final SortedMap<Long, LongSet> tokens = new TreeMap<>();
+ private final SortedMap<Long, KeyOffsets> tokens = new TreeMap<>();
public DynamicTokenTreeBuilder()
{}
@@ -40,54 +37,52 @@ public class DynamicTokenTreeBuilder extends AbstractTokenTreeBuilder
add(data);
}
- public DynamicTokenTreeBuilder(SortedMap<Long, LongSet> data)
+ public DynamicTokenTreeBuilder(SortedMap<Long, KeyOffsets> data)
{
add(data);
}
- public void add(Long token, long keyPosition)
+ public void add(Long token, long partitionOffset, long rowOffset)
{
- LongSet found = tokens.get(token);
+ KeyOffsets found = tokens.get(token);
if (found == null)
- tokens.put(token, (found = new LongOpenHashSet(2)));
+ tokens.put(token, (found = new KeyOffsets(2)));
- found.add(keyPosition);
+ found.put(partitionOffset, rowOffset);
}
- public void add(Iterator<Pair<Long, LongSet>> data)
+ public void add(Iterator<Pair<Long, KeyOffsets>> data)
{
while (data.hasNext())
{
- Pair<Long, LongSet> entry = data.next();
- for (LongCursor l : entry.right)
- add(entry.left, l.value);
+ Pair<Long, KeyOffsets> entry = data.next();
+ for (LongObjectCursor<long[]> cursor : entry.right)
+ for (long l : cursor.value)
+ add(entry.left, cursor.key, l);
}
}
- public void add(SortedMap<Long, LongSet> data)
+ public void add(SortedMap<Long, KeyOffsets> data)
{
- for (Map.Entry<Long, LongSet> newEntry : data.entrySet())
+ for (Map.Entry<Long, KeyOffsets> newEntry : data.entrySet())
{
- LongSet found = tokens.get(newEntry.getKey());
- if (found == null)
- tokens.put(newEntry.getKey(), (found = new LongOpenHashSet(4)));
-
- for (LongCursor offset : newEntry.getValue())
- found.add(offset.value);
+ for (LongObjectCursor<long[]> cursor : newEntry.getValue())
+ for (long l : cursor.value)
+ add(newEntry.getKey(), cursor.key, l);
}
}
- public Iterator<Pair<Long, LongSet>> iterator()
+ public Iterator<Pair<Long, KeyOffsets>> iterator()
{
- final Iterator<Map.Entry<Long, LongSet>> iterator = tokens.entrySet().iterator();
- return new AbstractIterator<Pair<Long, LongSet>>()
+ final Iterator<Map.Entry<Long, KeyOffsets>> iterator = tokens.entrySet().iterator();
+ return new AbstractIterator<Pair<Long, KeyOffsets>>()
{
- protected Pair<Long, LongSet> computeNext()
+ protected Pair<Long, KeyOffsets> computeNext()
{
if (!iterator.hasNext())
return endOfData();
- Map.Entry<Long, LongSet> entry = iterator.next();
+ Map.Entry<Long, KeyOffsets> entry = iterator.next();
return Pair.create(entry.getKey(), entry.getValue());
}
};
@@ -161,9 +156,9 @@ public class DynamicTokenTreeBuilder extends AbstractTokenTreeBuilder
private class DynamicLeaf extends Leaf
{
- private final SortedMap<Long, LongSet> tokens;
+ private final SortedMap<Long, KeyOffsets> tokens;
- DynamicLeaf(SortedMap<Long, LongSet> data)
+ DynamicLeaf(SortedMap<Long, KeyOffsets> data)
{
super(data.firstKey(), data.lastKey());
tokens = data;
@@ -181,7 +176,7 @@ public class DynamicTokenTreeBuilder extends AbstractTokenTreeBuilder
protected void serializeData(ByteBuffer buf)
{
- for (Map.Entry<Long, LongSet> entry : tokens.entrySet())
+ for (Map.Entry<Long, KeyOffsets> entry : tokens.entrySet())
createEntry(entry.getKey(), entry.getValue()).serialize(buf);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/disk/KeyOffsets.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/KeyOffsets.java b/src/java/org/apache/cassandra/index/sasi/disk/KeyOffsets.java
new file mode 100644
index 0000000..db849fe
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/disk/KeyOffsets.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.index.sasi.disk;
+
+import java.util.*;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import com.carrotsearch.hppc.LongObjectOpenHashMap;
+import com.carrotsearch.hppc.cursors.LongObjectCursor;
+
+public class KeyOffsets extends LongObjectOpenHashMap<long[]>
+{
+ public static final long NO_OFFSET = Long.MIN_VALUE;
+
+ public KeyOffsets() {
+ super(4);
+ }
+
+ public KeyOffsets(int initialCapacity) {
+ super(initialCapacity);
+ }
+
+ public void put(long currentPartitionOffset, long currentRowOffset)
+ {
+ if (containsKey(currentPartitionOffset))
+ super.put(currentPartitionOffset, append(get(currentPartitionOffset), currentRowOffset));
+ else
+ super.put(currentPartitionOffset, asArray(currentRowOffset));
+ }
+
+ public long[] put(long currentPartitionOffset, long[] currentRowOffset)
+ {
+ if (containsKey(currentPartitionOffset))
+ return super.put(currentPartitionOffset, merge(get(currentPartitionOffset), currentRowOffset));
+ else
+ return super.put(currentPartitionOffset, currentRowOffset);
+ }
+
+ public boolean equals(Object obj)
+ {
+ if (!(obj instanceof KeyOffsets))
+ return false;
+
+ KeyOffsets other = (KeyOffsets) obj;
+ if (other.size() != this.size())
+ return false;
+
+ for (LongObjectCursor<long[]> cursor : this)
+ if (!Arrays.equals(cursor.value, other.get(cursor.key)))
+ return false;
+
+ return true;
+ }
+
+ @Override
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder("KeyOffsets { ");
+ forEach((a, b) -> {
+ sb.append(a).append(": ").append(Arrays.toString(b));
+ });
+ sb.append(" }");
+ return sb.toString();
+ }
+
+ // primitive array creation
+ public static long[] asArray(long... vals)
+ {
+ return vals;
+ }
+
+ private static long[] merge(long[] arr1, long[] arr2)
+ {
+ long[] copy = new long[arr2.length];
+ int written = 0;
+ for (long l : arr2)
+ {
+ if (!ArrayUtils.contains(arr1, l))
+ copy[written++] = l;
+ }
+
+ if (written == 0)
+ return arr1;
+
+ long[] merged = new long[arr1.length + written];
+ System.arraycopy(arr1, 0, merged, 0, arr1.length);
+ System.arraycopy(copy, 0, merged, arr1.length, written);
+ return merged;
+ }
+
+ private static long[] append(long[] arr1, long v)
+ {
+ if (ArrayUtils.contains(arr1, v))
+ return arr1;
+ else
+ return ArrayUtils.add(arr1, v);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndex.java b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndex.java
index 4d43cd9..70d24a7 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndex.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndex.java
@@ -22,8 +22,7 @@ import java.nio.ByteBuffer;
import java.util.*;
import java.util.stream.Collectors;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.index.sasi.Term;
+import org.apache.cassandra.index.sasi.*;
import org.apache.cassandra.index.sasi.plan.Expression;
import org.apache.cassandra.index.sasi.plan.Expression.Op;
import org.apache.cassandra.index.sasi.utils.MappedBuffer;
@@ -37,12 +36,12 @@ import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
-import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
import static org.apache.cassandra.index.sasi.disk.OnDiskBlock.SearchResult;
+import static org.apache.cassandra.index.sasi.disk.TokenTreeBuilder.TOKEN_BYTES;
public class OnDiskIndex implements Iterable<OnDiskIndex.DataTerm>, Closeable
{
@@ -106,7 +105,7 @@ public class OnDiskIndex implements Iterable<OnDiskIndex.DataTerm>, Closeable
protected final long indexSize;
protected final boolean hasMarkedPartials;
- protected final Function<Long, DecoratedKey> keyFetcher;
+ protected final KeyFetcher keyFetcher;
protected final String indexPath;
@@ -116,7 +115,7 @@ public class OnDiskIndex implements Iterable<OnDiskIndex.DataTerm>, Closeable
protected final ByteBuffer minTerm, maxTerm, minKey, maxKey;
@SuppressWarnings("resource")
- public OnDiskIndex(File index, AbstractType<?> cmp, Function<Long, DecoratedKey> keyReader)
+ public OnDiskIndex(File index, AbstractType<?> cmp, KeyFetcher keyReader)
{
keyFetcher = keyReader;
@@ -635,6 +634,7 @@ public class OnDiskIndex implements Iterable<OnDiskIndex.DataTerm>, Closeable
{
final long blockEnd = FBUtilities.align(content.position(), OnDiskIndexBuilder.BLOCK_SIZE);
+ // ([int] -1 for sparse, offset for non-sparse)
if (isSparse())
return new PrefetchedTokensIterator(getSparseTokens());
@@ -658,7 +658,7 @@ public class OnDiskIndex implements Iterable<OnDiskIndex.DataTerm>, Closeable
NavigableMap<Long, Token> individualTokens = new TreeMap<>();
for (int i = 0; i < size; i++)
{
- Token token = perBlockIndex.get(content.getLong(ptrOffset + 1 + (8 * i)), keyFetcher);
+ Token token = perBlockIndex.get(content.getLong(ptrOffset + 1 + TOKEN_BYTES * i), keyFetcher);
assert token != null;
individualTokens.put(token.get(), token);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java
index 4946f06..b6e2da5 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
import java.util.*;
import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.dht.*;
import org.apache.cassandra.index.sasi.plan.Expression.Op;
import org.apache.cassandra.index.sasi.sa.IndexedTerm;
import org.apache.cassandra.index.sasi.sa.IntegralSA;
@@ -37,7 +38,6 @@ import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
import com.carrotsearch.hppc.LongArrayList;
-import com.carrotsearch.hppc.LongSet;
import com.carrotsearch.hppc.ShortArrayList;
import com.google.common.annotations.VisibleForTesting;
@@ -163,7 +163,7 @@ public class OnDiskIndexBuilder
this.marksPartials = marksPartials;
}
- public OnDiskIndexBuilder add(ByteBuffer term, DecoratedKey key, long keyPosition)
+ public OnDiskIndexBuilder add(ByteBuffer term, DecoratedKey key, long partitionOffset, long rowOffset)
{
if (term.remaining() >= MAX_TERM_SIZE)
{
@@ -183,16 +183,16 @@ public class OnDiskIndexBuilder
estimatedBytes += 64 + 48 + term.remaining();
}
- tokens.add((Long) key.getToken().getTokenValue(), keyPosition);
+ tokens.add((Long) key.getToken().getTokenValue(), partitionOffset, rowOffset);
// calculate key range (based on actual key values) for current index
minKey = (minKey == null || keyComparator.compare(minKey, key.getKey()) > 0) ? key.getKey() : minKey;
maxKey = (maxKey == null || keyComparator.compare(maxKey, key.getKey()) < 0) ? key.getKey() : maxKey;
- // 60 ((boolean(1)*4) + (long(8)*4) + 24) bytes for the LongOpenHashSet created when the keyPosition was added
- // + 40 bytes for the TreeMap.Entry + 8 bytes for the token (key).
+ // 84 ((boolean(1)*4) + (long(8)*4) + 24 + 24) bytes for the LongObjectOpenHashMap<long[]> created
+ // when the keyPosition was added + 40 bytes for the TreeMap.Entry + 8 bytes for the token (key).
// in the case of hash collision for the token we may overestimate but this is extremely rare
- estimatedBytes += 60 + 40 + 8;
+ estimatedBytes += 84 + 40 + 8;
return this;
}
@@ -569,7 +569,7 @@ public class OnDiskIndexBuilder
}
}
- private static class MutableDataBlock extends MutableBlock<InMemoryDataTerm>
+ private class MutableDataBlock extends MutableBlock<InMemoryDataTerm>
{
private static final int MAX_KEYS_SPARSE = 5;
@@ -651,7 +651,7 @@ public class OnDiskIndexBuilder
{
term.serialize(buffer);
buffer.writeByte((byte) keys.getTokenCount());
- for (Pair<Long, LongSet> key : keys)
+ for (Pair<Long, KeyOffsets> key : keys)
buffer.writeLong(key.left);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java b/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java
index 9fa4e87..c204883 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java
@@ -109,7 +109,7 @@ public class PerSSTableIndexWriter implements SSTableFlushObserver
currentKeyPosition = curPosition;
}
- public void nextUnfilteredCluster(Unfiltered unfiltered)
+ public void nextUnfilteredCluster(Unfiltered unfiltered, long currentRowOffset)
{
if (!unfiltered.isRow())
return;
@@ -129,10 +129,15 @@ public class PerSSTableIndexWriter implements SSTableFlushObserver
if (index == null)
indexes.put(column, (index = newIndex(columnIndex)));
- index.add(value.duplicate(), currentKey, currentKeyPosition);
+ index.add(value.duplicate(), currentKey, currentKeyPosition, currentRowOffset);
});
}
+ public void nextUnfilteredCluster(Unfiltered unfilteredCluster)
+ {
+ throw new UnsupportedOperationException("SASI Index does not support direct row access.");
+ }
+
public void complete()
{
if (isComplete)
@@ -197,7 +202,7 @@ public class PerSSTableIndexWriter implements SSTableFlushObserver
this.currentBuilder = newIndexBuilder();
}
- public void add(ByteBuffer term, DecoratedKey key, long keyPosition)
+ public void add(ByteBuffer term, DecoratedKey key, long partitoinOffset, long rowOffset)
{
if (term.remaining() == 0)
return;
@@ -235,7 +240,7 @@ public class PerSSTableIndexWriter implements SSTableFlushObserver
}
}
- currentBuilder.add(token, key, keyPosition);
+ currentBuilder.add(token, key, partitoinOffset, rowOffset);
isAdded = true;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/disk/RowKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/RowKey.java b/src/java/org/apache/cassandra/index/sasi/disk/RowKey.java
new file mode 100644
index 0000000..518ad27
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/disk/RowKey.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyclustering ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.index.sasi.disk;
+
+import java.util.*;
+import java.util.stream.*;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.utils.*;
+
+/**
+ * Primary key of the found row, a combination of the Partition Key
+ * and clustering that belongs to the row.
+ */
+public class RowKey implements Comparable<RowKey>
+{
+
+ public final DecoratedKey decoratedKey;
+ public final Clustering clustering;
+
+ private final ClusteringComparator comparator;
+
+ public RowKey(DecoratedKey primaryKey, Clustering clustering, ClusteringComparator comparator)
+ {
+ this.decoratedKey = primaryKey;
+ this.clustering = clustering;
+ this.comparator = comparator;
+ }
+
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ RowKey rowKey = (RowKey) o;
+
+ if (decoratedKey != null ? !decoratedKey.equals(rowKey.decoratedKey) : rowKey.decoratedKey != null)
+ return false;
+ return clustering != null ? clustering.equals(rowKey.clustering) : rowKey.clustering == null;
+ }
+
+ public int hashCode()
+ {
+ return new HashCodeBuilder().append(decoratedKey).append(clustering).hashCode();
+ }
+
+ public int compareTo(RowKey other)
+ {
+ int cmp = this.decoratedKey.compareTo(other.decoratedKey);
+ if (cmp == 0 && clustering != null)
+ {
+ // Both clustering and rows should match
+ if (clustering.kind() == ClusteringPrefix.Kind.STATIC_CLUSTERING || other.clustering.kind() == ClusteringPrefix.Kind.STATIC_CLUSTERING)
+ return 0;
+
+ return comparator.compare(this.clustering, other.clustering);
+ }
+ else
+ {
+ return cmp;
+ }
+ }
+
+ public static RowKeyComparator COMPARATOR = new RowKeyComparator();
+
+ public String toString(CFMetaData metadata)
+ {
+ return String.format("RowKey: { pk : %s, clustering: %s}",
+ metadata.getKeyValidator().getString(decoratedKey.getKey()),
+ clustering.toString(metadata));
+ }
+
+ @Override
+ public String toString()
+ {
+ return String.format("RowKey: { pk : %s, clustering: %s}",
+ ByteBufferUtil.bytesToHex(decoratedKey.getKey()),
+ String.join(",", Arrays.stream(clustering.getRawValues()).map(ByteBufferUtil::bytesToHex).collect(Collectors.toList())));
+ }
+
+ private static class RowKeyComparator implements Comparator<RowKey>
+ {
+ public int compare(RowKey o1, RowKey o2)
+ {
+ return o1.compareTo(o2);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/disk/StaticTokenTreeBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/StaticTokenTreeBuilder.java b/src/java/org/apache/cassandra/index/sasi/disk/StaticTokenTreeBuilder.java
index 6e64c56..8a11d60 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/StaticTokenTreeBuilder.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/StaticTokenTreeBuilder.java
@@ -19,8 +19,7 @@ package org.apache.cassandra.index.sasi.disk;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.Iterator;
-import java.util.SortedMap;
+import java.util.*;
import org.apache.cassandra.index.sasi.utils.CombinedTerm;
import org.apache.cassandra.index.sasi.utils.RangeIterator;
@@ -28,7 +27,6 @@ import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.utils.AbstractIterator;
import org.apache.cassandra.utils.Pair;
-import com.carrotsearch.hppc.LongSet;
import com.google.common.collect.Iterators;
/**
@@ -63,17 +61,17 @@ public class StaticTokenTreeBuilder extends AbstractTokenTreeBuilder
combinedTerm = term;
}
- public void add(Long token, long keyPosition)
+ public void add(Long token, long partitionOffset, long rowOffset)
{
throw new UnsupportedOperationException();
}
- public void add(SortedMap<Long, LongSet> data)
+ public void add(SortedMap<Long, KeyOffsets> data)
{
throw new UnsupportedOperationException();
}
- public void add(Iterator<Pair<Long, LongSet>> data)
+ public void add(Iterator<Pair<Long, KeyOffsets>> data)
{
throw new UnsupportedOperationException();
}
@@ -83,12 +81,12 @@ public class StaticTokenTreeBuilder extends AbstractTokenTreeBuilder
return tokenCount == 0;
}
- public Iterator<Pair<Long, LongSet>> iterator()
+ public Iterator<Pair<Long, KeyOffsets>> iterator()
{
- Iterator<Token> iterator = combinedTerm.getTokenIterator();
- return new AbstractIterator<Pair<Long, LongSet>>()
+ @SuppressWarnings("resource") Iterator<Token> iterator = combinedTerm.getTokenIterator();
+ return new AbstractIterator<Pair<Long, KeyOffsets>>()
{
- protected Pair<Long, LongSet> computeNext()
+ protected Pair<Long, KeyOffsets> computeNext()
{
if (!iterator.hasNext())
return endOfData();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/disk/Token.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/Token.java b/src/java/org/apache/cassandra/index/sasi/disk/Token.java
index 4cd1ea3..2412477 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/Token.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/Token.java
@@ -18,13 +18,11 @@
package org.apache.cassandra.index.sasi.disk;
import com.google.common.primitives.Longs;
+import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.index.sasi.utils.CombinedValue;
+import org.apache.cassandra.index.sasi.utils.*;
-import com.carrotsearch.hppc.LongSet;
-
-public abstract class Token implements CombinedValue<Long>, Iterable<DecoratedKey>
+public abstract class Token implements CombinedValue<Long>, Iterable<RowKey>
{
protected final long token;
@@ -38,7 +36,7 @@ public abstract class Token implements CombinedValue<Long>, Iterable<DecoratedKe
return token;
}
- public abstract LongSet getOffsets();
+ public abstract KeyOffsets getOffsets();
public int compareTo(CombinedValue<Long> o)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d857b46/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java b/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java
index c69ce00..1969627 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java
@@ -19,22 +19,21 @@ package org.apache.cassandra.index.sasi.disk;
import java.io.IOException;
import java.util.*;
+import java.util.stream.*;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.index.sasi.utils.AbstractIterator;
-import org.apache.cassandra.index.sasi.utils.CombinedValue;
-import org.apache.cassandra.index.sasi.utils.MappedBuffer;
-import org.apache.cassandra.index.sasi.utils.RangeIterator;
-import org.apache.cassandra.utils.MergeIterator;
-
-import com.carrotsearch.hppc.LongOpenHashSet;
-import com.carrotsearch.hppc.LongSet;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
import com.google.common.collect.Iterators;
import org.apache.commons.lang3.builder.HashCodeBuilder;
-import static org.apache.cassandra.index.sasi.disk.TokenTreeBuilder.EntryType;
+import com.carrotsearch.hppc.cursors.LongObjectCursor;
+import org.apache.cassandra.index.sasi.*;
+import org.apache.cassandra.index.sasi.disk.Descriptor.*;
+import org.apache.cassandra.index.sasi.utils.AbstractIterator;
+import org.apache.cassandra.index.sasi.utils.*;
+import org.apache.cassandra.utils.*;
+
+import static org.apache.cassandra.index.sasi.disk.Descriptor.Version.*;
+import static org.apache.cassandra.index.sasi.disk.TokenTreeBuilder.*;
// Note: all of the seek-able offsets contained in TokenTree should be sizeof(long)
// even if currently only lower int portion of them if used, because that makes
@@ -42,9 +41,6 @@ import static org.apache.cassandra.index.sasi.disk.TokenTreeBuilder.EntryType;
// without any on-disk format changes and/or re-indexing if one day we'll have a need to.
public class TokenTree
{
- private static final int LONG_BYTES = Long.SIZE / 8;
- private static final int SHORT_BYTES = Short.SIZE / 8;
-
private final Descriptor descriptor;
private final MappedBuffer file;
private final long startPos;
@@ -66,8 +62,7 @@ public class TokenTree
file.position(startPos + TokenTreeBuilder.SHARED_HEADER_BYTES);
- if (!validateMagic())
- throw new IllegalArgumentException("invalid token tree");
+ validateMagic();
tokenCount = file.getLong();
treeMinToken = file.getLong();
@@ -79,12 +74,12 @@ public class TokenTree
return tokenCount;
}
- public RangeIterator<Long, Token> iterator(Function<Long, DecoratedKey> keyFetcher)
+ public RangeIterator<Long, Token> iterator(KeyFetcher keyFetcher)
{
return new TokenTreeIterator(file.duplicate(), keyFetcher);
}
- public OnDiskToken get(final long searchToken, Function<Long, DecoratedKey> keyFetcher)
+ public OnDiskToken get(final long searchToken, KeyFetcher keyFetcher)
{
seekToLeaf(searchToken, file);
long leafStart = file.position();
@@ -95,21 +90,24 @@ public class TokenTree
file.position(leafStart + TokenTreeBuilder.BLOCK_HEADER_BYTES);
- OnDiskToken token = OnDiskToken.getTokenAt(file, tokenIndex, leafSize, keyFetcher);
+ OnDiskToken token = getTokenAt(file, tokenIndex, leafSize, keyFetcher);
+
return token.get().equals(searchToken) ? token : null;
}
- private boolean validateMagic()
+ private void validateMagic()
{
- switch (descriptor.version.toString())
- {
- case Descriptor.VERSION_AA:
- return true;
- case Descriptor.VERSION_AB:
- return TokenTreeBuilder.AB_MAGIC == file.getShort();
- default:
- return false;
- }
+ if (descriptor.version == aa)
+ return;
+
+ short magic = file.getShort();
+ if (descriptor.version == Version.ab && magic == TokenTreeBuilder.AB_MAGIC)
+ return;
+
+ if (descriptor.version == Version.ac && magic == TokenTreeBuilder.AC_MAGIC)
+ return;
+
+ throw new IllegalArgumentException("invalid token tree. Written magic: '" + ByteBufferUtil.bytesToHex(ByteBufferUtil.bytes(magic)) + "'");
}
// finds leaf that *could* contain token
@@ -136,18 +134,16 @@ public class TokenTree
long minToken = file.getLong();
long maxToken = file.getLong();
- long seekBase = blockStart + TokenTreeBuilder.BLOCK_HEADER_BYTES;
+ long seekBase = blockStart + BLOCK_HEADER_BYTES;
if (minToken > token)
{
// seek to beginning of child offsets to locate first child
- file.position(seekBase + tokenCount * LONG_BYTES);
- blockStart = (startPos + (int) file.getLong());
+ file.position(seekBase + tokenCount * TOKEN_BYTES);
}
else if (maxToken < token)
{
// seek to end of child offsets to locate last child
- file.position(seekBase + (2 * tokenCount) * LONG_BYTES);
- blockStart = (startPos + (int) file.getLong());
+ file.position(seekBase + (2 * tokenCount) * TOKEN_BYTES);
}
else
{
@@ -158,12 +154,11 @@ public class TokenTree
// file pointer is now at beginning of offsets
if (offsetIndex == tokenCount)
- file.position(file.position() + (offsetIndex * LONG_BYTES));
+ file.position(file.position() + (offsetIndex * TOKEN_BYTES));
else
- file.position(file.position() + ((tokenCount - offsetIndex - 1) + offsetIndex) * LONG_BYTES);
-
- blockStart = (startPos + (int) file.getLong());
+ file.position(file.position() + ((tokenCount - offsetIndex - 1) + offsetIndex) * TOKEN_BYTES);
}
+ blockStart = (startPos + (int) file.getLong());
}
}
@@ -172,8 +167,7 @@ public class TokenTree
short offsetIndex = 0;
for (int i = 0; i < tokenCount; i++)
{
- long readToken = file.getLong();
- if (searchToken < readToken)
+ if (searchToken < file.getLong())
break;
offsetIndex++;
@@ -193,10 +187,7 @@ public class TokenTree
while (start <= end)
{
middle = start + ((end - start) >> 1);
-
- // each entry is 16 bytes wide, token is in bytes 4-11
- long token = file.getLong(base + (middle * (2 * LONG_BYTES) + 4));
-
+ long token = file.getLong(base + middle * LEAF_ENTRY_BYTES + (descriptor.version.compareTo(Version.ac) < 0 ? LEGACY_TOKEN_OFFSET_BYTES : TOKEN_OFFSET_BYTES));
if (token == searchToken)
break;
@@ -209,9 +200,9 @@ public class TokenTree
return (short) middle;
}
- public class TokenTreeIterator extends RangeIterator<Long, Token>
+ private class TokenTreeIterator extends RangeIterator<Long, Token>
{
- private final Function<Long, DecoratedKey> keyFetcher;
+ private final KeyFetcher keyFetcher;
private final MappedBuffer file;
private long currentLeafStart;
@@ -224,7 +215,7 @@ public class TokenTree
protected boolean firstIteration = true;
private boolean lastLeaf;
- TokenTreeIterator(MappedBuffer file, Function<Long, DecoratedKey> keyFetcher)
+ TokenTreeIterator(MappedBuffer file, KeyFetcher keyFetcher)
{
super(treeMinToken, treeMaxToken, tokenCount);
@@ -314,13 +305,13 @@ public class TokenTree
private Token getTokenAt(int idx)
{
- return OnDiskToken.getTokenAt(file, idx, leafSize, keyFetcher);
+ return TokenTree.this.getTokenAt(file, idx, leafSize, keyFetcher);
}
private long getTokenPosition(int idx)
{
- // skip 4 byte entry header to get position pointing directly at the entry's token
- return OnDiskToken.getEntryPosition(idx, file) + (2 * SHORT_BYTES);
+ // skip entry header to get position pointing directly at the entry's token
+ return TokenTree.this.getEntryPosition(idx, file, descriptor) + (descriptor.version.compareTo(Version.ac) < 0 ? LEGACY_TOKEN_OFFSET_BYTES : TOKEN_OFFSET_BYTES);
}
private void seekToNextLeaf()
@@ -347,15 +338,15 @@ public class TokenTree
}
}
- public static class OnDiskToken extends Token
+ public class OnDiskToken extends Token
{
private final Set<TokenInfo> info = new HashSet<>(2);
- private final Set<DecoratedKey> loadedKeys = new TreeSet<>(DecoratedKey.comparator);
+ private final Set<RowKey> loadedKeys = new TreeSet<>(RowKey.COMPARATOR);
- public OnDiskToken(MappedBuffer buffer, long position, short leafSize, Function<Long, DecoratedKey> keyFetcher)
+ private OnDiskToken(MappedBuffer buffer, long position, short leafSize, KeyFetcher keyFetcher)
{
- super(buffer.getLong(position + (2 * SHORT_BYTES)));
- info.add(new TokenInfo(buffer, position, leafSize, keyFetcher));
+ super(buffer.getLong(position + (descriptor.version.compareTo(Version.ac) < 0 ? LEGACY_TOKEN_OFFSET_BYTES : TOKEN_OFFSET_BYTES)));
+ info.add(new TokenInfo(buffer, position, leafSize, keyFetcher, descriptor));
}
public void merge(CombinedValue<Long> other)
@@ -377,9 +368,9 @@ public class TokenTree
}
}
- public Iterator<DecoratedKey> iterator()
+ public Iterator<RowKey> iterator()
{
- List<Iterator<DecoratedKey>> keys = new ArrayList<>(info.size());
+ List<Iterator<RowKey>> keys = new ArrayList<>(info.size());
for (TokenInfo i : info)
keys.add(i.iterator());
@@ -387,68 +378,72 @@ public class TokenTree
if (!loadedKeys.isEmpty())
keys.add(loadedKeys.iterator());
- return MergeIterator.get(keys, DecoratedKey.comparator, new MergeIterator.Reducer<DecoratedKey, DecoratedKey>()
+ return MergeIterator.get(keys, RowKey.COMPARATOR, new MergeIterator.Reducer<RowKey, RowKey>()
{
- DecoratedKey reduced = null;
+ RowKey reduced = null;
public boolean trivialReduceIsTrivial()
{
return true;
}
- public void reduce(int idx, DecoratedKey current)
+ public void reduce(int idx, RowKey current)
{
reduced = current;
}
- protected DecoratedKey getReduced()
+ protected RowKey getReduced()
{
return reduced;
}
});
}
- public LongSet getOffsets()
+ public KeyOffsets getOffsets()
{
- LongSet offsets = new LongOpenHashSet(4);
+ KeyOffsets offsets = new KeyOffsets();
for (TokenInfo i : info)
{
- for (long offset : i.fetchOffsets())
- offsets.add(offset);
+ for (LongObjectCursor<long[]> offset : i.fetchOffsets())
+ offsets.put(offset.key, offset.value);
}
return offsets;
}
+ }
- public static OnDiskToken getTokenAt(MappedBuffer buffer, int idx, short leafSize, Function<Long, DecoratedKey> keyFetcher)
- {
- return new OnDiskToken(buffer, getEntryPosition(idx, buffer), leafSize, keyFetcher);
- }
+ private OnDiskToken getTokenAt(MappedBuffer buffer, int idx, short leafSize, KeyFetcher keyFetcher)
+ {
+ return new OnDiskToken(buffer, getEntryPosition(idx, buffer, descriptor), leafSize, keyFetcher);
+ }
- private static long getEntryPosition(int idx, MappedBuffer file)
- {
- // info (4 bytes) + token (8 bytes) + offset (4 bytes) = 16 bytes
- return file.position() + (idx * (2 * LONG_BYTES));
- }
+ private long getEntryPosition(int idx, MappedBuffer file, Descriptor descriptor)
+ {
+ if (descriptor.version.compareTo(Version.ac) < 0)
+ return file.position() + (idx * LEGACY_LEAF_ENTRY_BYTES);
+
+ // skip n entries, to the entry with the given index
+ return file.position() + (idx * LEAF_ENTRY_BYTES);
}
private static class TokenInfo
{
private final MappedBuffer buffer;
- private final Function<Long, DecoratedKey> keyFetcher;
-
+ private final KeyFetcher keyFetcher;
+ private final Descriptor descriptor;
private final long position;
private final short leafSize;
- public TokenInfo(MappedBuffer buffer, long position, short leafSize, Function<Long, DecoratedKey> keyFetcher)
+ public TokenInfo(MappedBuffer buffer, long position, short leafSize, KeyFetcher keyFetcher, Descriptor descriptor)
{
this.keyFetcher = keyFetcher;
this.buffer = buffer;
this.position = position;
this.leafSize = leafSize;
+ this.descriptor = descriptor;
}
- public Iterator<DecoratedKey> iterator()
+ public Iterator<RowKey> iterator()
{
return new KeyIterator(keyFetcher, fetchOffsets());
}
@@ -465,59 +460,154 @@ public class TokenTree
TokenInfo o = (TokenInfo) other;
return keyFetcher == o.keyFetcher && position == o.position;
+
}
- private long[] fetchOffsets()
+ /**
+ * Legacy leaf storage format (used for reading data formats before AC):
+ *
+ * [(short) leaf type][(short) offset extra bytes][(long) token][(int) offsetData]
+ *
+ * Many pairs can be encoded into long+int.
+ *
+ * Simple entry: offset fits into (int)
+ *
+ * [(short) leaf type][(short) offset extra bytes][(long) token][(int) offsetData]
+ *
+ * FactoredOffset: a single offset, offset fits into (long)+(int) bits:
+ *
+ * [(short) leaf type][(short) 16 bytes of remained offset][(long) token][(int) top 32 bits of offset]
+ *
+ * PackedCollisionEntry: packs the two offset entries into int and a short (if both of them fit into
+ * (long) and one of them fits into (int))
+ *
+ * [(short) leaf type][(short) 16 the offset that'd fit into short][(long) token][(int) 32 bits of offset that'd fit into int]
+ *
+ * Otherwise, the rest gets packed into limited-size overflow collision entry
+ *
+ * [(short) leaf type][(short) count][(long) token][(int) start index]
+ */
+ private KeyOffsets fetchOffsetsLegacy()
{
short info = buffer.getShort(position);
// offset extra is unsigned short (right-most 16 bits of 48 bits allowed for an offset)
- int offsetExtra = buffer.getShort(position + SHORT_BYTES) & 0xFFFF;
+ int offsetExtra = buffer.getShort(position + Short.BYTES) & 0xFFFF;
// is the it left-most (32-bit) base of the actual offset in the index file
- int offsetData = buffer.getInt(position + (2 * SHORT_BYTES) + LONG_BYTES);
+ int offsetData = buffer.getInt(position + (2 * Short.BYTES) + Long.BYTES);
EntryType type = EntryType.of(info & TokenTreeBuilder.ENTRY_TYPE_MASK);
+ KeyOffsets rowOffsets = new KeyOffsets();
switch (type)
{
case SIMPLE:
- return new long[] { offsetData };
-
+ rowOffsets.put(offsetData, KeyOffsets.NO_OFFSET);
+ break;
case OVERFLOW:
- long[] offsets = new long[offsetExtra]; // offsetShort contains count of tokens
- long offsetPos = (buffer.position() + (2 * (leafSize * LONG_BYTES)) + (offsetData * LONG_BYTES));
+ long offsetPos = (buffer.position() + (2 * (leafSize * Long.BYTES)) + (offsetData * Long.BYTES));
for (int i = 0; i < offsetExtra; i++)
- offsets[i] = buffer.getLong(offsetPos + (i * LONG_BYTES));
+ {
+ long offset = buffer.getLong(offsetPos + (i * Long.BYTES));;
+ rowOffsets.put(offset, KeyOffsets.NO_OFFSET);
+ }
+ break;
+ case FACTORED:
+ long offset = (((long) offsetData) << Short.SIZE) + offsetExtra;
+ rowOffsets.put(offset, KeyOffsets.NO_OFFSET);
+ break;
+ case PACKED:
+ rowOffsets.put(offsetExtra, KeyOffsets.NO_OFFSET);
+ rowOffsets.put(offsetData, KeyOffsets.NO_OFFSET);
+ default:
+ throw new IllegalStateException("Unknown entry type: " + type);
+ }
+ return rowOffsets;
+ }
- return offsets;
+ private KeyOffsets fetchOffsets()
+ {
+ if (descriptor.version.compareTo(Version.ac) < 0)
+ return fetchOffsetsLegacy();
- case FACTORED:
- return new long[] { (((long) offsetData) << Short.SIZE) + offsetExtra };
+ short info = buffer.getShort(position);
+ EntryType type = EntryType.of(info & TokenTreeBuilder.ENTRY_TYPE_MASK);
+ KeyOffsets rowOffsets = new KeyOffsets();
+ long baseOffset = position + LEAF_ENTRY_TYPE_BYTES + TOKEN_BYTES;
+ switch (type)
+ {
+ case SIMPLE:
+ long partitionOffset = buffer.getLong(baseOffset);
+ long rowOffset = buffer.getLong(baseOffset + LEAF_PARTITON_OFFSET_BYTES);
+
+ rowOffsets.put(partitionOffset, rowOffset);
+ break;
case PACKED:
- return new long[] { offsetExtra, offsetData };
+ long partitionOffset1 = buffer.getInt(baseOffset);
+ long rowOffset1 = buffer.getInt(baseOffset + LEAF_PARTITON_OFFSET_PACKED_BYTES);
+
+ long partitionOffset2 = buffer.getInt(baseOffset + LEAF_PARTITON_OFFSET_PACKED_BYTES + LEAF_ROW_OFFSET_PACKED_BYTES);
+ long rowOffset2 = buffer.getInt(baseOffset + 2 * LEAF_PARTITON_OFFSET_PACKED_BYTES + LEAF_ROW_OFFSET_PACKED_BYTES);
+ rowOffsets.put(partitionOffset1, rowOffset1);
+ rowOffsets.put(partitionOffset2, rowOffset2);
+ break;
+ case OVERFLOW:
+ long collisionOffset = buffer.getLong(baseOffset);
+ long count = buffer.getLong(baseOffset + LEAF_PARTITON_OFFSET_BYTES);
+
+ // Skip leaves and collision offsets that do not belong to current token
+ long offsetPos = buffer.position() + leafSize * LEAF_ENTRY_BYTES + collisionOffset * COLLISION_ENTRY_BYTES;
+
+ for (int i = 0; i < count; i++)
+ {
+ long currentPartitionOffset = buffer.getLong(offsetPos + i * COLLISION_ENTRY_BYTES);
+ long currentRowOffset = buffer.getLong(offsetPos + i * COLLISION_ENTRY_BYTES + LEAF_PARTITON_OFFSET_BYTES);
+
+ rowOffsets.put(currentPartitionOffset, currentRowOffset);
+ }
+ break;
default:
throw new IllegalStateException("Unknown entry type: " + type);
}
+
+
+ return rowOffsets;
}
}
- private static class KeyIterator extends AbstractIterator<DecoratedKey>
+ private static class KeyIterator extends AbstractIterator<RowKey>
{
- private final Function<Long, DecoratedKey> keyFetcher;
- private final long[] offsets;
- private int index = 0;
+ private final KeyFetcher keyFetcher;
+ private final Iterator<LongObjectCursor<long[]>> offsets;
+ private long currentPatitionKey;
+ private PrimitiveIterator.OfLong currentCursor = null;
- public KeyIterator(Function<Long, DecoratedKey> keyFetcher, long[] offsets)
+ public KeyIterator(KeyFetcher keyFetcher, KeyOffsets offsets)
{
this.keyFetcher = keyFetcher;
- this.offsets = offsets;
+ this.offsets = offsets.iterator();
}
- public DecoratedKey computeNext()
+ public RowKey computeNext()
{
- return index < offsets.length ? keyFetcher.apply(offsets[index++]) : endOfData();
+ if (currentCursor != null && currentCursor.hasNext())
+ {
+ return keyFetcher.getRowKey(currentPatitionKey, currentCursor.nextLong());
+ }
+ else if (offsets.hasNext())
+ {
+ LongObjectCursor<long[]> cursor = offsets.next();
+ currentPatitionKey = cursor.key;
+ currentCursor = LongStream.of(cursor.value).iterator();
+
+ return keyFetcher.getRowKey(currentPatitionKey, currentCursor.nextLong());
+ }
+ else
+ {
+ return endOfData();
+ }
}
}
-}
\ No newline at end of file
+}