You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by xe...@apache.org on 2016/11/17 23:27:40 UTC
[3/4] cassandra git commit: Revert "Add row offset support to SASI"
Revert "Add row offset support to SASI"
This reverts commit 7d857b46fb070548bf5e5f6ff81db588f08ec22a.
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/490c1c27
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/490c1c27
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/490c1c27
Branch: refs/heads/trunk
Commit: 490c1c27c9b700f14212d9591a516ddb8d0865c7
Parents: a1eef56
Author: Pavel Yaskevich <xe...@apache.org>
Authored: Thu Nov 17 15:20:04 2016 -0800
Committer: Pavel Yaskevich <xe...@apache.org>
Committed: Thu Nov 17 15:20:04 2016 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 -
.../org/apache/cassandra/db/ColumnIndex.java | 6 +-
.../apache/cassandra/index/sasi/KeyFetcher.java | 98 -------
.../apache/cassandra/index/sasi/SASIIndex.java | 11 +-
.../cassandra/index/sasi/SASIIndexBuilder.java | 13 +-
.../cassandra/index/sasi/SSTableIndex.java | 41 ++-
.../cassandra/index/sasi/conf/ColumnIndex.java | 4 +-
.../index/sasi/conf/view/RangeTermTree.java | 4 -
.../sasi/disk/AbstractTokenTreeBuilder.java | 276 ++++++++----------
.../cassandra/index/sasi/disk/Descriptor.java | 33 +--
.../sasi/disk/DynamicTokenTreeBuilder.java | 59 ++--
.../cassandra/index/sasi/disk/KeyOffsets.java | 115 --------
.../cassandra/index/sasi/disk/OnDiskIndex.java | 12 +-
.../index/sasi/disk/OnDiskIndexBuilder.java | 16 +-
.../index/sasi/disk/PerSSTableIndexWriter.java | 13 +-
.../cassandra/index/sasi/disk/RowKey.java | 108 -------
.../index/sasi/disk/StaticTokenTreeBuilder.java | 18 +-
.../apache/cassandra/index/sasi/disk/Token.java | 9 +-
.../cassandra/index/sasi/disk/TokenTree.java | 288 +++++++------------
.../index/sasi/disk/TokenTreeBuilder.java | 72 ++---
.../index/sasi/memory/IndexMemtable.java | 8 +-
.../index/sasi/memory/KeyRangeIterator.java | 49 ++--
.../cassandra/index/sasi/memory/MemIndex.java | 4 +-
.../index/sasi/memory/SkipListMemIndex.java | 12 +-
.../index/sasi/memory/TrieMemIndex.java | 45 +--
.../index/sasi/plan/QueryController.java | 49 ++--
.../cassandra/index/sasi/plan/QueryPlan.java | 174 +++--------
.../io/sstable/format/SSTableFlushObserver.java | 5 -
.../io/sstable/format/SSTableReader.java | 33 +--
.../io/sstable/format/big/BigTableWriter.java | 8 +-
.../org/apache/cassandra/utils/obs/BitUtil.java | 2 +-
test/data/legacy-sasi/on-disk-sa-int2.db | Bin 12312 -> 0 bytes
.../cassandra/index/sasi/SASIIndexTest.java | 25 +-
.../index/sasi/disk/KeyOffsetsTest.java | 48 ----
.../index/sasi/disk/OnDiskIndexTest.java | 216 +++++++-------
.../sasi/disk/PerSSTableIndexWriterTest.java | 112 ++------
.../index/sasi/disk/TokenTreeTest.java | 208 +++++++-------
.../index/sasi/plan/OperationTest.java | 2 +-
.../index/sasi/utils/KeyConverter.java | 69 -----
.../index/sasi/utils/LongIterator.java | 8 +-
.../sasi/utils/RangeUnionIteratorTest.java | 17 --
41 files changed, 745 insertions(+), 1546 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index db06341..6ca26f9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -45,7 +45,6 @@
* Allow filtering on partition key columns for queries without secondary indexes (CASSANDRA-11031)
* Fix Cassandra Stress reporting thread model and precision (CASSANDRA-12585)
* Add JMH benchmarks.jar (CASSANDRA-12586)
- * Add row offset support to SASI (CASSANDRA-11990)
* Cleanup uses of AlterTableStatementColumn (CASSANDRA-12567)
* Add keep-alive to streaming (CASSANDRA-11841)
* Tracing payload is passed through newSession(..) (CASSANDRA-11706)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java
index 8ea1272..de1b1df 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -121,10 +121,9 @@ public class ColumnIndex
{
Row staticRow = iterator.staticRow();
- long startPosition = currentPosition();
UnfilteredSerializer.serializer.serializeStaticRow(staticRow, header, writer, version);
if (!observers.isEmpty())
- observers.forEach((o) -> o.nextUnfilteredCluster(staticRow, startPosition));
+ observers.forEach((o) -> o.nextUnfilteredCluster(staticRow));
}
}
@@ -235,7 +234,6 @@ public class ColumnIndex
private void add(Unfiltered unfiltered) throws IOException
{
- final long origPos = writer.position();
long pos = currentPosition();
if (firstClustering == null)
@@ -249,7 +247,7 @@ public class ColumnIndex
// notify observers about each new row
if (!observers.isEmpty())
- observers.forEach((o) -> o.nextUnfilteredCluster(unfiltered, origPos));
+ observers.forEach((o) -> o.nextUnfilteredCluster(unfiltered));
lastClustering = unfiltered.clustering();
previousRowStart = pos;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/KeyFetcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/KeyFetcher.java b/src/java/org/apache/cassandra/index/sasi/KeyFetcher.java
deleted file mode 100644
index 80ee167..0000000
--- a/src/java/org/apache/cassandra/index/sasi/KeyFetcher.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.index.sasi;
-
-import java.io.IOException;
-
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.index.sasi.disk.*;
-import org.apache.cassandra.io.*;
-import org.apache.cassandra.io.sstable.format.*;
-
-
-public interface KeyFetcher
-{
- public Clustering getClustering(long offset);
- public DecoratedKey getPartitionKey(long offset);
-
- public RowKey getRowKey(long partitionOffset, long rowOffset);
-
- /**
- * Fetches clustering and partition key from the sstable.
- *
- * Currently, clustering key is fetched from the data file of the sstable and partition key is
- * read from the index file. Reading from index file helps us to warm up key cache in this case.
- */
- public static class SSTableKeyFetcher implements KeyFetcher
- {
- private final SSTableReader sstable;
-
- public SSTableKeyFetcher(SSTableReader reader)
- {
- sstable = reader;
- }
-
- @Override
- public Clustering getClustering(long offset)
- {
- try
- {
- return sstable.clusteringAt(offset);
- }
- catch (IOException e)
- {
- throw new FSReadError(new IOException("Failed to read clustering from " + sstable.descriptor, e), sstable.getFilename());
- }
- }
-
- @Override
- public DecoratedKey getPartitionKey(long offset)
- {
- try
- {
- return sstable.keyAt(offset);
- }
- catch (IOException e)
- {
- throw new FSReadError(new IOException("Failed to read key from " + sstable.descriptor, e), sstable.getFilename());
- }
- }
-
- @Override
- public RowKey getRowKey(long partitionOffset, long rowOffset)
- {
- if (rowOffset == KeyOffsets.NO_OFFSET)
- return new RowKey(getPartitionKey(partitionOffset), null, sstable.metadata.comparator);
- else
- return new RowKey(getPartitionKey(partitionOffset), getClustering(rowOffset), sstable.metadata.comparator);
- }
-
- public int hashCode()
- {
- return sstable.descriptor.hashCode();
- }
-
- public boolean equals(Object other)
- {
- return other instanceof SSTableKeyFetcher
- && sstable.descriptor.equals(((SSTableKeyFetcher) other).sstable.descriptor);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/SASIIndex.java b/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
index 65953a9..4375964 100644
--- a/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
+++ b/src/java/org/apache/cassandra/index/sasi/SASIIndex.java
@@ -46,7 +46,6 @@ import org.apache.cassandra.index.sasi.conf.ColumnIndex;
import org.apache.cassandra.index.sasi.conf.IndexMode;
import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder.Mode;
import org.apache.cassandra.index.sasi.disk.PerSSTableIndexWriter;
-import org.apache.cassandra.index.sasi.disk.RowKey;
import org.apache.cassandra.index.sasi.plan.QueryPlan;
import org.apache.cassandra.index.transactions.IndexTransaction;
import org.apache.cassandra.io.sstable.Descriptor;
@@ -183,14 +182,6 @@ public class SASIIndex implements Index, INotificationConsumer
return getTruncateTask(FBUtilities.timestampMicros());
}
- public Callable<?> getTruncateTask(Collection<SSTableReader> sstablesToRebuild)
- {
- return () -> {
- index.dropData(sstablesToRebuild);
- return null;
- };
- }
-
public Callable<?> getTruncateTask(long truncatedAt)
{
return () -> {
@@ -261,7 +252,7 @@ public class SASIIndex implements Index, INotificationConsumer
public void insertRow(Row row)
{
if (isNewData())
- adjustMemtableSize(index.index(new RowKey(key, row.clustering(), baseCfs.getComparator()), row), opGroup);
+ adjustMemtableSize(index.index(key, row), opGroup);
}
public void updateRow(Row oldRow, Row newRow)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java b/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java
index d6706ea..d50875a 100644
--- a/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java
+++ b/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java
@@ -94,23 +94,16 @@ class SASIIndexBuilder extends SecondaryIndexBuilder
{
RowIndexEntry indexEntry = sstable.getPosition(key, SSTableReader.Operator.EQ);
dataFile.seek(indexEntry.position);
- int staticOffset = ByteBufferUtil.readWithShortLength(dataFile).remaining(); // key
+ ByteBufferUtil.readWithShortLength(dataFile); // key
try (SSTableIdentityIterator partition = SSTableIdentityIterator.create(sstable, dataFile, key))
{
// if the row has statics attached, it has to be indexed separately
if (cfs.metadata.hasStaticColumns())
- {
- long staticPosition = indexEntry.position + staticOffset;
- indexWriter.nextUnfilteredCluster(partition.staticRow(), staticPosition);
- }
+ indexWriter.nextUnfilteredCluster(partition.staticRow());
- long position = dataFile.getPosition();
while (partition.hasNext())
- {
- indexWriter.nextUnfilteredCluster(partition.next(), position);
- position = dataFile.getPosition();
- }
+ indexWriter.nextUnfilteredCluster(partition.next());
}
}
catch (IOException ex)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/SSTableIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/SSTableIndex.java b/src/java/org/apache/cassandra/index/sasi/SSTableIndex.java
index f9c8abf..c67c39c 100644
--- a/src/java/org/apache/cassandra/index/sasi/SSTableIndex.java
+++ b/src/java/org/apache/cassandra/index/sasi/SSTableIndex.java
@@ -18,22 +18,28 @@
package org.apache.cassandra.index.sasi;
import java.io.File;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.index.sasi.conf.ColumnIndex;
-import org.apache.cassandra.index.sasi.disk.*;
+import org.apache.cassandra.index.sasi.disk.OnDiskIndex;
+import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder;
import org.apache.cassandra.index.sasi.disk.Token;
import org.apache.cassandra.index.sasi.plan.Expression;
import org.apache.cassandra.index.sasi.utils.RangeIterator;
+import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.concurrent.Ref;
import org.apache.commons.lang3.builder.HashCodeBuilder;
+import com.google.common.base.Function;
+
public class SSTableIndex
{
private final ColumnIndex columnIndex;
@@ -59,7 +65,7 @@ public class SSTableIndex
sstable.getFilename(),
columnIndex.getIndexName());
- this.index = new OnDiskIndex(indexFile, validator, new KeyFetcher.SSTableKeyFetcher(sstable));
+ this.index = new OnDiskIndex(indexFile, validator, new DecoratedKeyFetcher(sstable));
}
public OnDiskIndexBuilder.Mode mode()
@@ -157,5 +163,36 @@ public class SSTableIndex
return String.format("SSTableIndex(column: %s, SSTable: %s)", columnIndex.getColumnName(), sstable.descriptor);
}
+ private static class DecoratedKeyFetcher implements Function<Long, DecoratedKey>
+ {
+ private final SSTableReader sstable;
+
+ DecoratedKeyFetcher(SSTableReader reader)
+ {
+ sstable = reader;
+ }
+
+ public DecoratedKey apply(Long offset)
+ {
+ try
+ {
+ return sstable.keyAt(offset);
+ }
+ catch (IOException e)
+ {
+ throw new FSReadError(new IOException("Failed to read key from " + sstable.descriptor, e), sstable.getFilename());
+ }
+ }
+
+ public int hashCode()
+ {
+ return sstable.descriptor.hashCode();
+ }
+ public boolean equals(Object other)
+ {
+ return other instanceof DecoratedKeyFetcher
+ && sstable.descriptor.equals(((DecoratedKeyFetcher) other).sstable.descriptor);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java b/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java
index 459e5c3..0958113 100644
--- a/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/index/sasi/conf/ColumnIndex.java
@@ -30,6 +30,7 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.cql3.Operator;
+import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Memtable;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.AsciiType;
@@ -39,7 +40,6 @@ import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.index.sasi.analyzer.AbstractAnalyzer;
import org.apache.cassandra.index.sasi.conf.view.View;
import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder;
-import org.apache.cassandra.index.sasi.disk.RowKey;
import org.apache.cassandra.index.sasi.disk.Token;
import org.apache.cassandra.index.sasi.memory.IndexMemtable;
import org.apache.cassandra.index.sasi.plan.Expression;
@@ -99,7 +99,7 @@ public class ColumnIndex
return keyValidator;
}
- public long index(RowKey key, Row row)
+ public long index(DecoratedKey key, Row row)
{
return getCurrentMemtable().index(key, getValueOf(column, row, FBUtilities.nowInSeconds()));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/conf/view/RangeTermTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/conf/view/RangeTermTree.java b/src/java/org/apache/cassandra/index/sasi/conf/view/RangeTermTree.java
index 2775c29..d6b4551 100644
--- a/src/java/org/apache/cassandra/index/sasi/conf/view/RangeTermTree.java
+++ b/src/java/org/apache/cassandra/index/sasi/conf/view/RangeTermTree.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.index.sasi.conf.view;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -47,9 +46,6 @@ public class RangeTermTree implements TermTree
public Set<SSTableIndex> search(Expression e)
{
- if (e == null)
- return Collections.emptySet();
-
ByteBuffer minTerm = e.lower == null ? min : e.lower.value;
ByteBuffer maxTerm = e.upper == null ? max : e.upper.value;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/disk/AbstractTokenTreeBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/AbstractTokenTreeBuilder.java b/src/java/org/apache/cassandra/index/sasi/disk/AbstractTokenTreeBuilder.java
index 9245960..18994de 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/AbstractTokenTreeBuilder.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/AbstractTokenTreeBuilder.java
@@ -20,18 +20,19 @@ package org.apache.cassandra.index.sasi.disk;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.function.*;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
-import com.carrotsearch.hppc.LongArrayList;
-import com.carrotsearch.hppc.cursors.LongCursor;
-import com.carrotsearch.hppc.cursors.LongObjectCursor;
-import org.apache.cassandra.dht.*;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.utils.AbstractIterator;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
+import com.carrotsearch.hppc.LongArrayList;
+import com.carrotsearch.hppc.LongSet;
+import com.carrotsearch.hppc.cursors.LongCursor;
+
public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
{
protected int numBlocks;
@@ -64,7 +65,7 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
public int serializedSize()
{
if (numBlocks == 1)
- return BLOCK_HEADER_BYTES + ((int) tokenCount * LEAF_ENTRY_BYTES);
+ return (BLOCK_HEADER_BYTES + ((int) tokenCount * 16));
else
return numBlocks * BLOCK_BYTES;
}
@@ -111,15 +112,6 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
buffer.clear();
}
- /**
- * Tree node,
- *
- * B+-tree consists of root, interior nodes and leaves. Root can be either a node or a leaf.
- *
- * Depending on the concrete implementation of {@code TokenTreeBuilder}
- * leaf can be partial or static (in case of {@code StaticTokenTreeBuilder} or dynamic in case
- * of {@code DynamicTokenTreeBuilder}
- */
protected abstract class Node
{
protected InteriorNode parent;
@@ -187,16 +179,8 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
alignBuffer(buf, BLOCK_HEADER_BYTES);
}
- /**
- * Shared header part, written for all node types:
- * [ info byte ] [ token count ] [ min node token ] [ max node token ]
- * [ 1b ] [ 2b (short) ] [ 8b (long) ] [ 8b (long) ]
- **/
private abstract class Header
{
- /**
- * Serializes the shared part of the header
- */
public void serialize(ByteBuffer buf)
{
buf.put(infoByte())
@@ -208,12 +192,6 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
protected abstract byte infoByte();
}
- /**
- * In addition to shared header part, root header stores version information,
- * overall token count and min/max tokens for the whole tree:
- * [ magic ] [ overall token count ] [ min tree token ] [ max tree token ]
- * [ 2b (short) ] [ 8b (long) ] [ 8b (long) ] [ 8b (long) ]
- */
private class RootHeader extends Header
{
public void serialize(ByteBuffer buf)
@@ -229,21 +207,19 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
{
// if leaf, set leaf indicator and last leaf indicator (bits 0 & 1)
// if not leaf, clear both bits
- return isLeaf() ? ENTRY_TYPE_MASK : 0;
+ return (byte) ((isLeaf()) ? 3 : 0);
}
protected void writeMagic(ByteBuffer buf)
{
switch (Descriptor.CURRENT_VERSION)
{
- case ab:
+ case Descriptor.VERSION_AB:
buf.putShort(AB_MAGIC);
break;
- case ac:
- buf.putShort(AC_MAGIC);
- break;
+
default:
- throw new RuntimeException("Unsupported version");
+ break;
}
}
@@ -273,12 +249,6 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
}
- /**
- * Leaf consists of
- * - header (format described in {@code Header} )
- * - data (format described in {@code LeafEntry})
- * - overflow collision entries, that hold {@value OVERFLOW_TRAILER_CAPACITY} of {@code RowOffset}.
- */
protected abstract class Leaf extends Node
{
protected LongArrayList overflowCollisions;
@@ -309,98 +279,82 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
protected abstract void serializeData(ByteBuffer buf);
- protected LeafEntry createEntry(final long tok, final KeyOffsets offsets)
+ protected LeafEntry createEntry(final long tok, final LongSet offsets)
{
- LongArrayList rawOffsets = new LongArrayList(offsets.size());
-
- offsets.forEach(new Consumer<LongObjectCursor<long[]>>()
- {
- public void accept(LongObjectCursor<long[]> cursor)
- {
- for (long l : cursor.value)
- {
- rawOffsets.add(cursor.key);
- rawOffsets.add(l);
- }
- }
- });
-
- int offsetCount = rawOffsets.size();
+ int offsetCount = offsets.size();
switch (offsetCount)
{
case 0:
throw new AssertionError("no offsets for token " + tok);
+ case 1:
+ long offset = offsets.toArray()[0];
+ if (offset > MAX_OFFSET)
+ throw new AssertionError("offset " + offset + " cannot be greater than " + MAX_OFFSET);
+ else if (offset <= Integer.MAX_VALUE)
+ return new SimpleLeafEntry(tok, offset);
+ else
+ return new FactoredOffsetLeafEntry(tok, offset);
case 2:
- return new SimpleLeafEntry(tok, rawOffsets.get(0), rawOffsets.get(1));
+ long[] rawOffsets = offsets.toArray();
+ if (rawOffsets[0] <= Integer.MAX_VALUE && rawOffsets[1] <= Integer.MAX_VALUE &&
+ (rawOffsets[0] <= Short.MAX_VALUE || rawOffsets[1] <= Short.MAX_VALUE))
+ return new PackedCollisionLeafEntry(tok, rawOffsets);
+ else
+ return createOverflowEntry(tok, offsetCount, offsets);
default:
- assert offsetCount % 2 == 0;
- if (offsetCount == 4)
- {
- if (rawOffsets.get(0) < Integer.MAX_VALUE && rawOffsets.get(1) < Integer.MAX_VALUE &&
- rawOffsets.get(2) < Integer.MAX_VALUE && rawOffsets.get(3) < Integer.MAX_VALUE)
- {
- return new PackedCollisionLeafEntry(tok, (int)rawOffsets.get(0), (int) rawOffsets.get(1),
- (int) rawOffsets.get(2), (int) rawOffsets.get(3));
- }
- }
- return createOverflowEntry(tok, offsetCount, rawOffsets);
+ return createOverflowEntry(tok, offsetCount, offsets);
}
}
- private LeafEntry createOverflowEntry(final long tok, final int offsetCount, final LongArrayList offsets)
+ private LeafEntry createOverflowEntry(final long tok, final int offsetCount, final LongSet offsets)
{
if (overflowCollisions == null)
- overflowCollisions = new LongArrayList(offsetCount);
-
- int overflowCount = (overflowCollisions.size() + offsetCount) / 2;
- if (overflowCount >= OVERFLOW_TRAILER_CAPACITY)
- throw new AssertionError("cannot have more than " + OVERFLOW_TRAILER_CAPACITY + " overflow collisions per leaf, but had: " + overflowCount);
+ overflowCollisions = new LongArrayList();
- LeafEntry entry = new OverflowCollisionLeafEntry(tok, (short) (overflowCollisions.size() / 2), (short) (offsetCount / 2));
- overflowCollisions.addAll(offsets);
+ LeafEntry entry = new OverflowCollisionLeafEntry(tok, (short) overflowCollisions.size(), (short) offsetCount);
+ for (LongCursor o : offsets)
+ {
+ if (overflowCollisions.size() == OVERFLOW_TRAILER_CAPACITY)
+ throw new AssertionError("cannot have more than " + OVERFLOW_TRAILER_CAPACITY + " overflow collisions per leaf");
+ else
+ overflowCollisions.add(o.value);
+ }
return entry;
}
- /**
- * A leaf of the B+-Tree, that holds information about the row offset(s) for
- * the current token.
- *
- * Main 3 types of leaf entries are:
- * 1) simple leaf entry: holding just a single row offset
- * 2) packed collision leaf entry: holding two entries that would fit together into 168 bytes
- * 3) overflow entry: only holds offset in overflow trailer and amount of entries belonging to this leaf
- */
protected abstract class LeafEntry
{
protected final long token;
abstract public EntryType type();
+ abstract public int offsetData();
+ abstract public short offsetExtra();
public LeafEntry(final long tok)
{
token = tok;
}
- public abstract void serialize(ByteBuffer buf);
+ public void serialize(ByteBuffer buf)
+ {
+ buf.putShort((short) type().ordinal())
+ .putShort(offsetExtra())
+ .putLong(token)
+ .putInt(offsetData());
+ }
}
- /**
- * Simple leaf, that can store a single row offset, having the following format:
- *
- * [ type ] [ token ] [ partition offset ] [ row offset ]
- * [ 2b (short) ] [ 8b (long) ] [ 8b (long) ] [ 8b (long) ]
- */
+
+ // assumes there is a single offset and the offset is <= Integer.MAX_VALUE
protected class SimpleLeafEntry extends LeafEntry
{
- private final long partitionOffset;
- private final long rowOffset;
+ private final long offset;
- public SimpleLeafEntry(final long tok, final long partitionOffset, final long rowOffset)
+ public SimpleLeafEntry(final long tok, final long off)
{
super(tok);
- this.partitionOffset = partitionOffset;
- this.rowOffset = rowOffset;
+ offset = off;
}
public EntryType type()
@@ -408,38 +362,61 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
return EntryType.SIMPLE;
}
- @Override
- public void serialize(ByteBuffer buf)
+ public int offsetData()
{
- buf.putShort((short) type().ordinal())
- .putLong(token)
- .putLong(partitionOffset)
- .putLong(rowOffset);
+ return (int) offset;
+ }
+
+ public short offsetExtra()
+ {
+ return 0;
}
}
- /**
- * Packed collision entry, can store two offsets, if each one of their positions
- * fit into 4 bytes.
- * [ type ] [ token ] [ partition offset 1 ] [ row offset 1] [ partition offset 1 ] [ row offset 1]
- * [ 2b (short) ] [ 8b (long) ] [ 4b (int) ] [ 4b (int) ] [ 4b (int) ] [ 4b (int) ]
- */
- protected class PackedCollisionLeafEntry extends LeafEntry
+ // assumes there is a single offset and Integer.MAX_VALUE < offset <= MAX_OFFSET
+ // take the middle 32 bits of offset (or the top 32 when considering offset is max 48 bits)
+ // and store where offset is normally stored. take bottom 16 bits of offset and store in entry header
+ private class FactoredOffsetLeafEntry extends LeafEntry
{
- private final int partitionOffset1;
- private final int rowOffset1;
- private final int partitionOffset2;
- private final int rowOffset2;
+ private final long offset;
- public PackedCollisionLeafEntry(final long tok, final int partitionOffset1, final int rowOffset1,
- final int partitionOffset2, final int rowOffset2)
+ public FactoredOffsetLeafEntry(final long tok, final long off)
{
super(tok);
- this.partitionOffset1 = partitionOffset1;
- this.rowOffset1 = rowOffset1;
- this.partitionOffset2 = partitionOffset2;
- this.rowOffset2 = rowOffset2;
+ offset = off;
+ }
+ public EntryType type()
+ {
+ return EntryType.FACTORED;
+ }
+
+ public int offsetData()
+ {
+ return (int) (offset >>> Short.SIZE);
+ }
+
+ public short offsetExtra()
+ {
+ // exta offset is supposed to be an unsigned 16-bit integer
+ return (short) offset;
+ }
+ }
+
+ // holds an entry with two offsets that can be packed in an int & a short
+ // the int offset is stored where offset is normally stored. short offset is
+ // stored in entry header
+ private class PackedCollisionLeafEntry extends LeafEntry
+ {
+ private short smallerOffset;
+ private int largerOffset;
+
+ public PackedCollisionLeafEntry(final long tok, final long[] offs)
+ {
+ super(tok);
+
+ smallerOffset = (short) Math.min(offs[0], offs[1]);
+ largerOffset = (int) Math.max(offs[0], offs[1]);
}
public EntryType type()
@@ -447,27 +424,21 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
return EntryType.PACKED;
}
- @Override
- public void serialize(ByteBuffer buf)
+ public int offsetData()
{
- buf.putShort((short) type().ordinal())
- .putLong(token)
- .putInt(partitionOffset1)
- .putInt(rowOffset1)
- .putInt(partitionOffset2)
- .putInt(rowOffset2);
- }
- }
-
- /**
- * Overflow collision entry, holds an entry with three or more offsets, or two offsets
- * that cannot be packed into 16 bytes.
- * [ type ] [ token ] [ start index ] [ count ]
- * [ 2b (short) ] [ 8b (long) ] [ 8b (long) ] [ 8b (long) ]
- *
- * - [ start index ] is a position of first item belonging to this leaf entry in the overflow trailer
- * - [ count ] is the amount of items belonging to this leaf entry that are stored in the overflow trailer
- */
+ return largerOffset;
+ }
+
+ public short offsetExtra()
+ {
+ return smallerOffset;
+ }
+ }
+
+ // holds an entry with three or more offsets, or two offsets that cannot
+ // be packed into an int & a short. the index into the overflow list
+ // is stored where the offset is normally stored. the number of overflowed offsets
+ // for the entry is stored in the entry header
private class OverflowCollisionLeafEntry extends LeafEntry
{
private final short startIndex;
@@ -485,23 +456,20 @@ public abstract class AbstractTokenTreeBuilder implements TokenTreeBuilder
return EntryType.OVERFLOW;
}
- @Override
- public void serialize(ByteBuffer buf)
+ public int offsetData()
{
- buf.putShort((short) type().ordinal())
- .putLong(token)
- .putLong(startIndex)
- .putLong(count);
+ return startIndex;
}
+
+ public short offsetExtra()
+ {
+ return count;
+ }
+
}
+
}
- /**
- * Interior node consists of:
- * - (interior node) header
- * - tokens (serialized as longs, with count stored in header)
- * - child offsets
- */
protected class InteriorNode extends Node
{
protected List<Long> tokens = new ArrayList<>(TOKENS_PER_BLOCK);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/disk/Descriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/Descriptor.java b/src/java/org/apache/cassandra/index/sasi/disk/Descriptor.java
index 3fa0f06..3aa6f14 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/Descriptor.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/Descriptor.java
@@ -22,29 +22,30 @@ package org.apache.cassandra.index.sasi.disk;
*/
public class Descriptor
{
- public static enum Version
+ public static final String VERSION_AA = "aa";
+ public static final String VERSION_AB = "ab";
+ public static final String CURRENT_VERSION = VERSION_AB;
+ public static final Descriptor CURRENT = new Descriptor(CURRENT_VERSION);
+
+ public static class Version
{
- aa,
- ab,
- ac
- }
+ public final String version;
- public static final Version VERSION_AA = Version.aa;
- public static final Version VERSION_AB = Version.ab;
- public static final Version VERSION_AC = Version.ac;
+ public Version(String version)
+ {
+ this.version = version;
+ }
- public static final Version CURRENT_VERSION = Version.ac;
- public static final Descriptor CURRENT = new Descriptor(CURRENT_VERSION);
+ public String toString()
+ {
+ return version;
+ }
+ }
public final Version version;
public Descriptor(String v)
{
- this.version = Version.valueOf(v);
- }
-
- public Descriptor(Version v)
- {
- this.version = v;
+ this.version = new Version(v);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/disk/DynamicTokenTreeBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/DynamicTokenTreeBuilder.java b/src/java/org/apache/cassandra/index/sasi/disk/DynamicTokenTreeBuilder.java
index 6e3e163..2ddfd89 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/DynamicTokenTreeBuilder.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/DynamicTokenTreeBuilder.java
@@ -20,14 +20,17 @@ package org.apache.cassandra.index.sasi.disk;
import java.nio.ByteBuffer;
import java.util.*;
-import com.carrotsearch.hppc.cursors.LongObjectCursor;
-import org.apache.cassandra.dht.*;
-import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.AbstractIterator;
+import org.apache.cassandra.utils.Pair;
+
+import com.carrotsearch.hppc.LongOpenHashSet;
+import com.carrotsearch.hppc.LongSet;
+import com.carrotsearch.hppc.cursors.LongCursor;
public class DynamicTokenTreeBuilder extends AbstractTokenTreeBuilder
{
+ private final SortedMap<Long, LongSet> tokens = new TreeMap<>();
- private final SortedMap<Long, KeyOffsets> tokens = new TreeMap<>();
public DynamicTokenTreeBuilder()
{}
@@ -37,52 +40,54 @@ public class DynamicTokenTreeBuilder extends AbstractTokenTreeBuilder
add(data);
}
- public DynamicTokenTreeBuilder(SortedMap<Long, KeyOffsets> data)
+ public DynamicTokenTreeBuilder(SortedMap<Long, LongSet> data)
{
add(data);
}
- public void add(Long token, long partitionOffset, long rowOffset)
+ public void add(Long token, long keyPosition)
{
- KeyOffsets found = tokens.get(token);
+ LongSet found = tokens.get(token);
if (found == null)
- tokens.put(token, (found = new KeyOffsets(2)));
+ tokens.put(token, (found = new LongOpenHashSet(2)));
- found.put(partitionOffset, rowOffset);
+ found.add(keyPosition);
}
- public void add(Iterator<Pair<Long, KeyOffsets>> data)
+ public void add(Iterator<Pair<Long, LongSet>> data)
{
while (data.hasNext())
{
- Pair<Long, KeyOffsets> entry = data.next();
- for (LongObjectCursor<long[]> cursor : entry.right)
- for (long l : cursor.value)
- add(entry.left, cursor.key, l);
+ Pair<Long, LongSet> entry = data.next();
+ for (LongCursor l : entry.right)
+ add(entry.left, l.value);
}
}
- public void add(SortedMap<Long, KeyOffsets> data)
+ public void add(SortedMap<Long, LongSet> data)
{
- for (Map.Entry<Long, KeyOffsets> newEntry : data.entrySet())
+ for (Map.Entry<Long, LongSet> newEntry : data.entrySet())
{
- for (LongObjectCursor<long[]> cursor : newEntry.getValue())
- for (long l : cursor.value)
- add(newEntry.getKey(), cursor.key, l);
+ LongSet found = tokens.get(newEntry.getKey());
+ if (found == null)
+ tokens.put(newEntry.getKey(), (found = new LongOpenHashSet(4)));
+
+ for (LongCursor offset : newEntry.getValue())
+ found.add(offset.value);
}
}
- public Iterator<Pair<Long, KeyOffsets>> iterator()
+ public Iterator<Pair<Long, LongSet>> iterator()
{
- final Iterator<Map.Entry<Long, KeyOffsets>> iterator = tokens.entrySet().iterator();
- return new AbstractIterator<Pair<Long, KeyOffsets>>()
+ final Iterator<Map.Entry<Long, LongSet>> iterator = tokens.entrySet().iterator();
+ return new AbstractIterator<Pair<Long, LongSet>>()
{
- protected Pair<Long, KeyOffsets> computeNext()
+ protected Pair<Long, LongSet> computeNext()
{
if (!iterator.hasNext())
return endOfData();
- Map.Entry<Long, KeyOffsets> entry = iterator.next();
+ Map.Entry<Long, LongSet> entry = iterator.next();
return Pair.create(entry.getKey(), entry.getValue());
}
};
@@ -156,9 +161,9 @@ public class DynamicTokenTreeBuilder extends AbstractTokenTreeBuilder
private class DynamicLeaf extends Leaf
{
- private final SortedMap<Long, KeyOffsets> tokens;
+ private final SortedMap<Long, LongSet> tokens;
- DynamicLeaf(SortedMap<Long, KeyOffsets> data)
+ DynamicLeaf(SortedMap<Long, LongSet> data)
{
super(data.firstKey(), data.lastKey());
tokens = data;
@@ -176,7 +181,7 @@ public class DynamicTokenTreeBuilder extends AbstractTokenTreeBuilder
protected void serializeData(ByteBuffer buf)
{
- for (Map.Entry<Long, KeyOffsets> entry : tokens.entrySet())
+ for (Map.Entry<Long, LongSet> entry : tokens.entrySet())
createEntry(entry.getKey(), entry.getValue()).serialize(buf);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/disk/KeyOffsets.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/KeyOffsets.java b/src/java/org/apache/cassandra/index/sasi/disk/KeyOffsets.java
deleted file mode 100644
index db849fe..0000000
--- a/src/java/org/apache/cassandra/index/sasi/disk/KeyOffsets.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.index.sasi.disk;
-
-import java.util.*;
-
-import org.apache.commons.lang3.ArrayUtils;
-
-import com.carrotsearch.hppc.LongObjectOpenHashMap;
-import com.carrotsearch.hppc.cursors.LongObjectCursor;
-
-public class KeyOffsets extends LongObjectOpenHashMap<long[]>
-{
- public static final long NO_OFFSET = Long.MIN_VALUE;
-
- public KeyOffsets() {
- super(4);
- }
-
- public KeyOffsets(int initialCapacity) {
- super(initialCapacity);
- }
-
- public void put(long currentPartitionOffset, long currentRowOffset)
- {
- if (containsKey(currentPartitionOffset))
- super.put(currentPartitionOffset, append(get(currentPartitionOffset), currentRowOffset));
- else
- super.put(currentPartitionOffset, asArray(currentRowOffset));
- }
-
- public long[] put(long currentPartitionOffset, long[] currentRowOffset)
- {
- if (containsKey(currentPartitionOffset))
- return super.put(currentPartitionOffset, merge(get(currentPartitionOffset), currentRowOffset));
- else
- return super.put(currentPartitionOffset, currentRowOffset);
- }
-
- public boolean equals(Object obj)
- {
- if (!(obj instanceof KeyOffsets))
- return false;
-
- KeyOffsets other = (KeyOffsets) obj;
- if (other.size() != this.size())
- return false;
-
- for (LongObjectCursor<long[]> cursor : this)
- if (!Arrays.equals(cursor.value, other.get(cursor.key)))
- return false;
-
- return true;
- }
-
- @Override
- public String toString()
- {
- StringBuilder sb = new StringBuilder("KeyOffsets { ");
- forEach((a, b) -> {
- sb.append(a).append(": ").append(Arrays.toString(b));
- });
- sb.append(" }");
- return sb.toString();
- }
-
- // primitive array creation
- public static long[] asArray(long... vals)
- {
- return vals;
- }
-
- private static long[] merge(long[] arr1, long[] arr2)
- {
- long[] copy = new long[arr2.length];
- int written = 0;
- for (long l : arr2)
- {
- if (!ArrayUtils.contains(arr1, l))
- copy[written++] = l;
- }
-
- if (written == 0)
- return arr1;
-
- long[] merged = new long[arr1.length + written];
- System.arraycopy(arr1, 0, merged, 0, arr1.length);
- System.arraycopy(copy, 0, merged, arr1.length, written);
- return merged;
- }
-
- private static long[] append(long[] arr1, long v)
- {
- if (ArrayUtils.contains(arr1, v))
- return arr1;
- else
- return ArrayUtils.add(arr1, v);
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndex.java b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndex.java
index 70d24a7..4d43cd9 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndex.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndex.java
@@ -22,7 +22,8 @@ import java.nio.ByteBuffer;
import java.util.*;
import java.util.stream.Collectors;
-import org.apache.cassandra.index.sasi.*;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.index.sasi.Term;
import org.apache.cassandra.index.sasi.plan.Expression;
import org.apache.cassandra.index.sasi.plan.Expression.Op;
import org.apache.cassandra.index.sasi.utils.MappedBuffer;
@@ -36,12 +37,12 @@ import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
+import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
import static org.apache.cassandra.index.sasi.disk.OnDiskBlock.SearchResult;
-import static org.apache.cassandra.index.sasi.disk.TokenTreeBuilder.TOKEN_BYTES;
public class OnDiskIndex implements Iterable<OnDiskIndex.DataTerm>, Closeable
{
@@ -105,7 +106,7 @@ public class OnDiskIndex implements Iterable<OnDiskIndex.DataTerm>, Closeable
protected final long indexSize;
protected final boolean hasMarkedPartials;
- protected final KeyFetcher keyFetcher;
+ protected final Function<Long, DecoratedKey> keyFetcher;
protected final String indexPath;
@@ -115,7 +116,7 @@ public class OnDiskIndex implements Iterable<OnDiskIndex.DataTerm>, Closeable
protected final ByteBuffer minTerm, maxTerm, minKey, maxKey;
@SuppressWarnings("resource")
- public OnDiskIndex(File index, AbstractType<?> cmp, KeyFetcher keyReader)
+ public OnDiskIndex(File index, AbstractType<?> cmp, Function<Long, DecoratedKey> keyReader)
{
keyFetcher = keyReader;
@@ -634,7 +635,6 @@ public class OnDiskIndex implements Iterable<OnDiskIndex.DataTerm>, Closeable
{
final long blockEnd = FBUtilities.align(content.position(), OnDiskIndexBuilder.BLOCK_SIZE);
- // ([int] -1 for sparse, offset for non-sparse)
if (isSparse())
return new PrefetchedTokensIterator(getSparseTokens());
@@ -658,7 +658,7 @@ public class OnDiskIndex implements Iterable<OnDiskIndex.DataTerm>, Closeable
NavigableMap<Long, Token> individualTokens = new TreeMap<>();
for (int i = 0; i < size; i++)
{
- Token token = perBlockIndex.get(content.getLong(ptrOffset + 1 + TOKEN_BYTES * i), keyFetcher);
+ Token token = perBlockIndex.get(content.getLong(ptrOffset + 1 + (8 * i)), keyFetcher);
assert token != null;
individualTokens.put(token.get(), token);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java
index b6e2da5..4946f06 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java
@@ -23,7 +23,6 @@ import java.nio.ByteBuffer;
import java.util.*;
import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.dht.*;
import org.apache.cassandra.index.sasi.plan.Expression.Op;
import org.apache.cassandra.index.sasi.sa.IndexedTerm;
import org.apache.cassandra.index.sasi.sa.IntegralSA;
@@ -38,6 +37,7 @@ import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
import com.carrotsearch.hppc.LongArrayList;
+import com.carrotsearch.hppc.LongSet;
import com.carrotsearch.hppc.ShortArrayList;
import com.google.common.annotations.VisibleForTesting;
@@ -163,7 +163,7 @@ public class OnDiskIndexBuilder
this.marksPartials = marksPartials;
}
- public OnDiskIndexBuilder add(ByteBuffer term, DecoratedKey key, long partitionOffset, long rowOffset)
+ public OnDiskIndexBuilder add(ByteBuffer term, DecoratedKey key, long keyPosition)
{
if (term.remaining() >= MAX_TERM_SIZE)
{
@@ -183,16 +183,16 @@ public class OnDiskIndexBuilder
estimatedBytes += 64 + 48 + term.remaining();
}
- tokens.add((Long) key.getToken().getTokenValue(), partitionOffset, rowOffset);
+ tokens.add((Long) key.getToken().getTokenValue(), keyPosition);
// calculate key range (based on actual key values) for current index
minKey = (minKey == null || keyComparator.compare(minKey, key.getKey()) > 0) ? key.getKey() : minKey;
maxKey = (maxKey == null || keyComparator.compare(maxKey, key.getKey()) < 0) ? key.getKey() : maxKey;
- // 84 ((boolean(1)*4) + (long(8)*4) + 24 + 24) bytes for the LongObjectOpenHashMap<long[]> created
- // when the keyPosition was added + 40 bytes for the TreeMap.Entry + 8 bytes for the token (key).
+ // 60 ((boolean(1)*4) + (long(8)*4) + 24) bytes for the LongOpenHashSet created when the keyPosition was added
+ // + 40 bytes for the TreeMap.Entry + 8 bytes for the token (key).
// in the case of hash collision for the token we may overestimate but this is extremely rare
- estimatedBytes += 84 + 40 + 8;
+ estimatedBytes += 60 + 40 + 8;
return this;
}
@@ -569,7 +569,7 @@ public class OnDiskIndexBuilder
}
}
- private class MutableDataBlock extends MutableBlock<InMemoryDataTerm>
+ private static class MutableDataBlock extends MutableBlock<InMemoryDataTerm>
{
private static final int MAX_KEYS_SPARSE = 5;
@@ -651,7 +651,7 @@ public class OnDiskIndexBuilder
{
term.serialize(buffer);
buffer.writeByte((byte) keys.getTokenCount());
- for (Pair<Long, KeyOffsets> key : keys)
+ for (Pair<Long, LongSet> key : keys)
buffer.writeLong(key.left);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java b/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java
index c204883..9fa4e87 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java
@@ -109,7 +109,7 @@ public class PerSSTableIndexWriter implements SSTableFlushObserver
currentKeyPosition = curPosition;
}
- public void nextUnfilteredCluster(Unfiltered unfiltered, long currentRowOffset)
+ public void nextUnfilteredCluster(Unfiltered unfiltered)
{
if (!unfiltered.isRow())
return;
@@ -129,15 +129,10 @@ public class PerSSTableIndexWriter implements SSTableFlushObserver
if (index == null)
indexes.put(column, (index = newIndex(columnIndex)));
- index.add(value.duplicate(), currentKey, currentKeyPosition, currentRowOffset);
+ index.add(value.duplicate(), currentKey, currentKeyPosition);
});
}
- public void nextUnfilteredCluster(Unfiltered unfilteredCluster)
- {
- throw new UnsupportedOperationException("SASI Index does not support direct row access.");
- }
-
public void complete()
{
if (isComplete)
@@ -202,7 +197,7 @@ public class PerSSTableIndexWriter implements SSTableFlushObserver
this.currentBuilder = newIndexBuilder();
}
- public void add(ByteBuffer term, DecoratedKey key, long partitoinOffset, long rowOffset)
+ public void add(ByteBuffer term, DecoratedKey key, long keyPosition)
{
if (term.remaining() == 0)
return;
@@ -240,7 +235,7 @@ public class PerSSTableIndexWriter implements SSTableFlushObserver
}
}
- currentBuilder.add(token, key, partitoinOffset, rowOffset);
+ currentBuilder.add(token, key, keyPosition);
isAdded = true;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/disk/RowKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/RowKey.java b/src/java/org/apache/cassandra/index/sasi/disk/RowKey.java
deleted file mode 100644
index fc5a2c0..0000000
--- a/src/java/org/apache/cassandra/index/sasi/disk/RowKey.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyclustering ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.index.sasi.disk;
-
-import java.util.*;
-import java.util.stream.*;
-
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.utils.*;
-
-/**
- * Primary key of the found row, a combination of the Partition Key
- * and clustering that belongs to the row.
- */
-public class RowKey implements Comparable<RowKey>
-{
-
- public final DecoratedKey decoratedKey;
- public final Clustering clustering;
-
- private final ClusteringComparator comparator;
-
- public RowKey(DecoratedKey primaryKey, Clustering clustering, ClusteringComparator comparator)
- {
- this.decoratedKey = primaryKey;
- this.clustering = clustering;
- this.comparator = comparator;
- }
-
- public boolean equals(Object o)
- {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- RowKey rowKey = (RowKey) o;
-
- if (decoratedKey != null ? !decoratedKey.equals(rowKey.decoratedKey) : rowKey.decoratedKey != null)
- return false;
- return clustering != null ? clustering.equals(rowKey.clustering) : rowKey.clustering == null;
- }
-
- public int hashCode()
- {
- return new HashCodeBuilder().append(decoratedKey).append(clustering).toHashCode();
- }
-
- public int compareTo(RowKey other)
- {
- int cmp = this.decoratedKey.compareTo(other.decoratedKey);
- if (cmp == 0 && clustering != null)
- {
- // Both clustering and rows should match
- if (clustering.kind() == ClusteringPrefix.Kind.STATIC_CLUSTERING || other.clustering.kind() == ClusteringPrefix.Kind.STATIC_CLUSTERING)
- return 0;
-
- return comparator.compare(this.clustering, other.clustering);
- }
- else
- {
- return cmp;
- }
- }
-
- public static RowKeyComparator COMPARATOR = new RowKeyComparator();
-
- public String toString(CFMetaData metadata)
- {
- return String.format("RowKey: { pk : %s, clustering: %s}",
- metadata.getKeyValidator().getString(decoratedKey.getKey()),
- clustering.toString(metadata));
- }
-
- @Override
- public String toString()
- {
- return String.format("RowKey: { pk : %s, clustering: %s}",
- ByteBufferUtil.bytesToHex(decoratedKey.getKey()),
- String.join(",", Arrays.stream(clustering.getRawValues()).map(ByteBufferUtil::bytesToHex).collect(Collectors.toList())));
- }
-
- private static class RowKeyComparator implements Comparator<RowKey>
- {
- public int compare(RowKey o1, RowKey o2)
- {
- return o1.compareTo(o2);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/disk/StaticTokenTreeBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/StaticTokenTreeBuilder.java b/src/java/org/apache/cassandra/index/sasi/disk/StaticTokenTreeBuilder.java
index 8a11d60..6e64c56 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/StaticTokenTreeBuilder.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/StaticTokenTreeBuilder.java
@@ -19,7 +19,8 @@ package org.apache.cassandra.index.sasi.disk;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.Iterator;
+import java.util.SortedMap;
import org.apache.cassandra.index.sasi.utils.CombinedTerm;
import org.apache.cassandra.index.sasi.utils.RangeIterator;
@@ -27,6 +28,7 @@ import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.utils.AbstractIterator;
import org.apache.cassandra.utils.Pair;
+import com.carrotsearch.hppc.LongSet;
import com.google.common.collect.Iterators;
/**
@@ -61,17 +63,17 @@ public class StaticTokenTreeBuilder extends AbstractTokenTreeBuilder
combinedTerm = term;
}
- public void add(Long token, long partitionOffset, long rowOffset)
+ public void add(Long token, long keyPosition)
{
throw new UnsupportedOperationException();
}
- public void add(SortedMap<Long, KeyOffsets> data)
+ public void add(SortedMap<Long, LongSet> data)
{
throw new UnsupportedOperationException();
}
- public void add(Iterator<Pair<Long, KeyOffsets>> data)
+ public void add(Iterator<Pair<Long, LongSet>> data)
{
throw new UnsupportedOperationException();
}
@@ -81,12 +83,12 @@ public class StaticTokenTreeBuilder extends AbstractTokenTreeBuilder
return tokenCount == 0;
}
- public Iterator<Pair<Long, KeyOffsets>> iterator()
+ public Iterator<Pair<Long, LongSet>> iterator()
{
- @SuppressWarnings("resource") Iterator<Token> iterator = combinedTerm.getTokenIterator();
- return new AbstractIterator<Pair<Long, KeyOffsets>>()
+ Iterator<Token> iterator = combinedTerm.getTokenIterator();
+ return new AbstractIterator<Pair<Long, LongSet>>()
{
- protected Pair<Long, KeyOffsets> computeNext()
+ protected Pair<Long, LongSet> computeNext()
{
if (!iterator.hasNext())
return endOfData();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/disk/Token.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/Token.java b/src/java/org/apache/cassandra/index/sasi/disk/Token.java
index 8ea864f..4cd1ea3 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/Token.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/Token.java
@@ -19,9 +19,12 @@ package org.apache.cassandra.index.sasi.disk;
import com.google.common.primitives.Longs;
-import org.apache.cassandra.index.sasi.utils.*;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.index.sasi.utils.CombinedValue;
-public abstract class Token implements CombinedValue<Long>, Iterable<RowKey>
+import com.carrotsearch.hppc.LongSet;
+
+public abstract class Token implements CombinedValue<Long>, Iterable<DecoratedKey>
{
protected final long token;
@@ -35,7 +38,7 @@ public abstract class Token implements CombinedValue<Long>, Iterable<RowKey>
return token;
}
- public abstract KeyOffsets getOffsets();
+ public abstract LongSet getOffsets();
public int compareTo(CombinedValue<Long> o)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/490c1c27/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java b/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java
index 1969627..c69ce00 100644
--- a/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java
+++ b/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java
@@ -19,21 +19,22 @@ package org.apache.cassandra.index.sasi.disk;
import java.io.IOException;
import java.util.*;
-import java.util.stream.*;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.index.sasi.utils.AbstractIterator;
+import org.apache.cassandra.index.sasi.utils.CombinedValue;
+import org.apache.cassandra.index.sasi.utils.MappedBuffer;
+import org.apache.cassandra.index.sasi.utils.RangeIterator;
+import org.apache.cassandra.utils.MergeIterator;
+
+import com.carrotsearch.hppc.LongOpenHashSet;
+import com.carrotsearch.hppc.LongSet;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
import com.google.common.collect.Iterators;
import org.apache.commons.lang3.builder.HashCodeBuilder;
-import com.carrotsearch.hppc.cursors.LongObjectCursor;
-import org.apache.cassandra.index.sasi.*;
-import org.apache.cassandra.index.sasi.disk.Descriptor.*;
-import org.apache.cassandra.index.sasi.utils.AbstractIterator;
-import org.apache.cassandra.index.sasi.utils.*;
-import org.apache.cassandra.utils.*;
-
-import static org.apache.cassandra.index.sasi.disk.Descriptor.Version.*;
-import static org.apache.cassandra.index.sasi.disk.TokenTreeBuilder.*;
+import static org.apache.cassandra.index.sasi.disk.TokenTreeBuilder.EntryType;
// Note: all of the seek-able offsets contained in TokenTree should be sizeof(long)
// even if currently only lower int portion of them if used, because that makes
@@ -41,6 +42,9 @@ import static org.apache.cassandra.index.sasi.disk.TokenTreeBuilder.*;
// without any on-disk format changes and/or re-indexing if one day we'll have a need to.
public class TokenTree
{
+ private static final int LONG_BYTES = Long.SIZE / 8;
+ private static final int SHORT_BYTES = Short.SIZE / 8;
+
private final Descriptor descriptor;
private final MappedBuffer file;
private final long startPos;
@@ -62,7 +66,8 @@ public class TokenTree
file.position(startPos + TokenTreeBuilder.SHARED_HEADER_BYTES);
- validateMagic();
+ if (!validateMagic())
+ throw new IllegalArgumentException("invalid token tree");
tokenCount = file.getLong();
treeMinToken = file.getLong();
@@ -74,12 +79,12 @@ public class TokenTree
return tokenCount;
}
- public RangeIterator<Long, Token> iterator(KeyFetcher keyFetcher)
+ public RangeIterator<Long, Token> iterator(Function<Long, DecoratedKey> keyFetcher)
{
return new TokenTreeIterator(file.duplicate(), keyFetcher);
}
- public OnDiskToken get(final long searchToken, KeyFetcher keyFetcher)
+ public OnDiskToken get(final long searchToken, Function<Long, DecoratedKey> keyFetcher)
{
seekToLeaf(searchToken, file);
long leafStart = file.position();
@@ -90,24 +95,21 @@ public class TokenTree
file.position(leafStart + TokenTreeBuilder.BLOCK_HEADER_BYTES);
- OnDiskToken token = getTokenAt(file, tokenIndex, leafSize, keyFetcher);
-
+ OnDiskToken token = OnDiskToken.getTokenAt(file, tokenIndex, leafSize, keyFetcher);
return token.get().equals(searchToken) ? token : null;
}
- private void validateMagic()
+ private boolean validateMagic()
{
- if (descriptor.version == aa)
- return;
-
- short magic = file.getShort();
- if (descriptor.version == Version.ab && magic == TokenTreeBuilder.AB_MAGIC)
- return;
-
- if (descriptor.version == Version.ac && magic == TokenTreeBuilder.AC_MAGIC)
- return;
-
- throw new IllegalArgumentException("invalid token tree. Written magic: '" + ByteBufferUtil.bytesToHex(ByteBufferUtil.bytes(magic)) + "'");
+ switch (descriptor.version.toString())
+ {
+ case Descriptor.VERSION_AA:
+ return true;
+ case Descriptor.VERSION_AB:
+ return TokenTreeBuilder.AB_MAGIC == file.getShort();
+ default:
+ return false;
+ }
}
// finds leaf that *could* contain token
@@ -134,16 +136,18 @@ public class TokenTree
long minToken = file.getLong();
long maxToken = file.getLong();
- long seekBase = blockStart + BLOCK_HEADER_BYTES;
+ long seekBase = blockStart + TokenTreeBuilder.BLOCK_HEADER_BYTES;
if (minToken > token)
{
// seek to beginning of child offsets to locate first child
- file.position(seekBase + tokenCount * TOKEN_BYTES);
+ file.position(seekBase + tokenCount * LONG_BYTES);
+ blockStart = (startPos + (int) file.getLong());
}
else if (maxToken < token)
{
// seek to end of child offsets to locate last child
- file.position(seekBase + (2 * tokenCount) * TOKEN_BYTES);
+ file.position(seekBase + (2 * tokenCount) * LONG_BYTES);
+ blockStart = (startPos + (int) file.getLong());
}
else
{
@@ -154,11 +158,12 @@ public class TokenTree
// file pointer is now at beginning of offsets
if (offsetIndex == tokenCount)
- file.position(file.position() + (offsetIndex * TOKEN_BYTES));
+ file.position(file.position() + (offsetIndex * LONG_BYTES));
else
- file.position(file.position() + ((tokenCount - offsetIndex - 1) + offsetIndex) * TOKEN_BYTES);
+ file.position(file.position() + ((tokenCount - offsetIndex - 1) + offsetIndex) * LONG_BYTES);
+
+ blockStart = (startPos + (int) file.getLong());
}
- blockStart = (startPos + (int) file.getLong());
}
}
@@ -167,7 +172,8 @@ public class TokenTree
short offsetIndex = 0;
for (int i = 0; i < tokenCount; i++)
{
- if (searchToken < file.getLong())
+ long readToken = file.getLong();
+ if (searchToken < readToken)
break;
offsetIndex++;
@@ -187,7 +193,10 @@ public class TokenTree
while (start <= end)
{
middle = start + ((end - start) >> 1);
- long token = file.getLong(base + middle * LEAF_ENTRY_BYTES + (descriptor.version.compareTo(Version.ac) < 0 ? LEGACY_TOKEN_OFFSET_BYTES : TOKEN_OFFSET_BYTES));
+
+ // each entry is 16 bytes wide, token is in bytes 4-11
+ long token = file.getLong(base + (middle * (2 * LONG_BYTES) + 4));
+
if (token == searchToken)
break;
@@ -200,9 +209,9 @@ public class TokenTree
return (short) middle;
}
- private class TokenTreeIterator extends RangeIterator<Long, Token>
+ public class TokenTreeIterator extends RangeIterator<Long, Token>
{
- private final KeyFetcher keyFetcher;
+ private final Function<Long, DecoratedKey> keyFetcher;
private final MappedBuffer file;
private long currentLeafStart;
@@ -215,7 +224,7 @@ public class TokenTree
protected boolean firstIteration = true;
private boolean lastLeaf;
- TokenTreeIterator(MappedBuffer file, KeyFetcher keyFetcher)
+ TokenTreeIterator(MappedBuffer file, Function<Long, DecoratedKey> keyFetcher)
{
super(treeMinToken, treeMaxToken, tokenCount);
@@ -305,13 +314,13 @@ public class TokenTree
private Token getTokenAt(int idx)
{
- return TokenTree.this.getTokenAt(file, idx, leafSize, keyFetcher);
+ return OnDiskToken.getTokenAt(file, idx, leafSize, keyFetcher);
}
private long getTokenPosition(int idx)
{
- // skip entry header to get position pointing directly at the entry's token
- return TokenTree.this.getEntryPosition(idx, file, descriptor) + (descriptor.version.compareTo(Version.ac) < 0 ? LEGACY_TOKEN_OFFSET_BYTES : TOKEN_OFFSET_BYTES);
+ // skip 4 byte entry header to get position pointing directly at the entry's token
+ return OnDiskToken.getEntryPosition(idx, file) + (2 * SHORT_BYTES);
}
private void seekToNextLeaf()
@@ -338,15 +347,15 @@ public class TokenTree
}
}
- public class OnDiskToken extends Token
+ public static class OnDiskToken extends Token
{
private final Set<TokenInfo> info = new HashSet<>(2);
- private final Set<RowKey> loadedKeys = new TreeSet<>(RowKey.COMPARATOR);
+ private final Set<DecoratedKey> loadedKeys = new TreeSet<>(DecoratedKey.comparator);
- private OnDiskToken(MappedBuffer buffer, long position, short leafSize, KeyFetcher keyFetcher)
+ public OnDiskToken(MappedBuffer buffer, long position, short leafSize, Function<Long, DecoratedKey> keyFetcher)
{
- super(buffer.getLong(position + (descriptor.version.compareTo(Version.ac) < 0 ? LEGACY_TOKEN_OFFSET_BYTES : TOKEN_OFFSET_BYTES)));
- info.add(new TokenInfo(buffer, position, leafSize, keyFetcher, descriptor));
+ super(buffer.getLong(position + (2 * SHORT_BYTES)));
+ info.add(new TokenInfo(buffer, position, leafSize, keyFetcher));
}
public void merge(CombinedValue<Long> other)
@@ -368,9 +377,9 @@ public class TokenTree
}
}
- public Iterator<RowKey> iterator()
+ public Iterator<DecoratedKey> iterator()
{
- List<Iterator<RowKey>> keys = new ArrayList<>(info.size());
+ List<Iterator<DecoratedKey>> keys = new ArrayList<>(info.size());
for (TokenInfo i : info)
keys.add(i.iterator());
@@ -378,72 +387,68 @@ public class TokenTree
if (!loadedKeys.isEmpty())
keys.add(loadedKeys.iterator());
- return MergeIterator.get(keys, RowKey.COMPARATOR, new MergeIterator.Reducer<RowKey, RowKey>()
+ return MergeIterator.get(keys, DecoratedKey.comparator, new MergeIterator.Reducer<DecoratedKey, DecoratedKey>()
{
- RowKey reduced = null;
+ DecoratedKey reduced = null;
public boolean trivialReduceIsTrivial()
{
return true;
}
- public void reduce(int idx, RowKey current)
+ public void reduce(int idx, DecoratedKey current)
{
reduced = current;
}
- protected RowKey getReduced()
+ protected DecoratedKey getReduced()
{
return reduced;
}
});
}
- public KeyOffsets getOffsets()
+ public LongSet getOffsets()
{
- KeyOffsets offsets = new KeyOffsets();
+ LongSet offsets = new LongOpenHashSet(4);
for (TokenInfo i : info)
{
- for (LongObjectCursor<long[]> offset : i.fetchOffsets())
- offsets.put(offset.key, offset.value);
+ for (long offset : i.fetchOffsets())
+ offsets.add(offset);
}
return offsets;
}
- }
- private OnDiskToken getTokenAt(MappedBuffer buffer, int idx, short leafSize, KeyFetcher keyFetcher)
- {
- return new OnDiskToken(buffer, getEntryPosition(idx, buffer, descriptor), leafSize, keyFetcher);
- }
-
- private long getEntryPosition(int idx, MappedBuffer file, Descriptor descriptor)
- {
- if (descriptor.version.compareTo(Version.ac) < 0)
- return file.position() + (idx * LEGACY_LEAF_ENTRY_BYTES);
+ public static OnDiskToken getTokenAt(MappedBuffer buffer, int idx, short leafSize, Function<Long, DecoratedKey> keyFetcher)
+ {
+ return new OnDiskToken(buffer, getEntryPosition(idx, buffer), leafSize, keyFetcher);
+ }
- // skip n entries, to the entry with the given index
- return file.position() + (idx * LEAF_ENTRY_BYTES);
+ private static long getEntryPosition(int idx, MappedBuffer file)
+ {
+ // info (4 bytes) + token (8 bytes) + offset (4 bytes) = 16 bytes
+ return file.position() + (idx * (2 * LONG_BYTES));
+ }
}
private static class TokenInfo
{
private final MappedBuffer buffer;
- private final KeyFetcher keyFetcher;
- private final Descriptor descriptor;
+ private final Function<Long, DecoratedKey> keyFetcher;
+
private final long position;
private final short leafSize;
- public TokenInfo(MappedBuffer buffer, long position, short leafSize, KeyFetcher keyFetcher, Descriptor descriptor)
+ public TokenInfo(MappedBuffer buffer, long position, short leafSize, Function<Long, DecoratedKey> keyFetcher)
{
this.keyFetcher = keyFetcher;
this.buffer = buffer;
this.position = position;
this.leafSize = leafSize;
- this.descriptor = descriptor;
}
- public Iterator<RowKey> iterator()
+ public Iterator<DecoratedKey> iterator()
{
return new KeyIterator(keyFetcher, fetchOffsets());
}
@@ -460,154 +465,59 @@ public class TokenTree
TokenInfo o = (TokenInfo) other;
return keyFetcher == o.keyFetcher && position == o.position;
-
}
- /**
- * Legacy leaf storage format (used for reading data formats before AC):
- *
- * [(short) leaf type][(short) offset extra bytes][(long) token][(int) offsetData]
- *
- * Many pairs can be encoded into long+int.
- *
- * Simple entry: offset fits into (int)
- *
- * [(short) leaf type][(short) offset extra bytes][(long) token][(int) offsetData]
- *
- * FactoredOffset: a single offset, offset fits into (long)+(int) bits:
- *
- * [(short) leaf type][(short) 16 bytes of remained offset][(long) token][(int) top 32 bits of offset]
- *
- * PackedCollisionEntry: packs the two offset entries into int and a short (if both of them fit into
- * (long) and one of them fits into (int))
- *
- * [(short) leaf type][(short) 16 the offset that'd fit into short][(long) token][(int) 32 bits of offset that'd fit into int]
- *
- * Otherwise, the rest gets packed into limited-size overflow collision entry
- *
- * [(short) leaf type][(short) count][(long) token][(int) start index]
- */
- private KeyOffsets fetchOffsetsLegacy()
+ private long[] fetchOffsets()
{
short info = buffer.getShort(position);
// offset extra is unsigned short (right-most 16 bits of 48 bits allowed for an offset)
- int offsetExtra = buffer.getShort(position + Short.BYTES) & 0xFFFF;
+ int offsetExtra = buffer.getShort(position + SHORT_BYTES) & 0xFFFF;
// is the it left-most (32-bit) base of the actual offset in the index file
- int offsetData = buffer.getInt(position + (2 * Short.BYTES) + Long.BYTES);
+ int offsetData = buffer.getInt(position + (2 * SHORT_BYTES) + LONG_BYTES);
EntryType type = EntryType.of(info & TokenTreeBuilder.ENTRY_TYPE_MASK);
- KeyOffsets rowOffsets = new KeyOffsets();
switch (type)
{
case SIMPLE:
- rowOffsets.put(offsetData, KeyOffsets.NO_OFFSET);
- break;
+ return new long[] { offsetData };
+
case OVERFLOW:
- long offsetPos = (buffer.position() + (2 * (leafSize * Long.BYTES)) + (offsetData * Long.BYTES));
+ long[] offsets = new long[offsetExtra]; // offsetShort contains count of tokens
+ long offsetPos = (buffer.position() + (2 * (leafSize * LONG_BYTES)) + (offsetData * LONG_BYTES));
for (int i = 0; i < offsetExtra; i++)
- {
- long offset = buffer.getLong(offsetPos + (i * Long.BYTES));;
- rowOffsets.put(offset, KeyOffsets.NO_OFFSET);
- }
- break;
- case FACTORED:
- long offset = (((long) offsetData) << Short.SIZE) + offsetExtra;
- rowOffsets.put(offset, KeyOffsets.NO_OFFSET);
- break;
- case PACKED:
- rowOffsets.put(offsetExtra, KeyOffsets.NO_OFFSET);
- rowOffsets.put(offsetData, KeyOffsets.NO_OFFSET);
- default:
- throw new IllegalStateException("Unknown entry type: " + type);
- }
- return rowOffsets;
- }
+ offsets[i] = buffer.getLong(offsetPos + (i * LONG_BYTES));
- private KeyOffsets fetchOffsets()
- {
- if (descriptor.version.compareTo(Version.ac) < 0)
- return fetchOffsetsLegacy();
-
- short info = buffer.getShort(position);
- EntryType type = EntryType.of(info & TokenTreeBuilder.ENTRY_TYPE_MASK);
+ return offsets;
- KeyOffsets rowOffsets = new KeyOffsets();
- long baseOffset = position + LEAF_ENTRY_TYPE_BYTES + TOKEN_BYTES;
- switch (type)
- {
- case SIMPLE:
- long partitionOffset = buffer.getLong(baseOffset);
- long rowOffset = buffer.getLong(baseOffset + LEAF_PARTITON_OFFSET_BYTES);
+ case FACTORED:
+ return new long[] { (((long) offsetData) << Short.SIZE) + offsetExtra };
- rowOffsets.put(partitionOffset, rowOffset);
- break;
case PACKED:
- long partitionOffset1 = buffer.getInt(baseOffset);
- long rowOffset1 = buffer.getInt(baseOffset + LEAF_PARTITON_OFFSET_PACKED_BYTES);
-
- long partitionOffset2 = buffer.getInt(baseOffset + LEAF_PARTITON_OFFSET_PACKED_BYTES + LEAF_ROW_OFFSET_PACKED_BYTES);
- long rowOffset2 = buffer.getInt(baseOffset + 2 * LEAF_PARTITON_OFFSET_PACKED_BYTES + LEAF_ROW_OFFSET_PACKED_BYTES);
+ return new long[] { offsetExtra, offsetData };
- rowOffsets.put(partitionOffset1, rowOffset1);
- rowOffsets.put(partitionOffset2, rowOffset2);
- break;
- case OVERFLOW:
- long collisionOffset = buffer.getLong(baseOffset);
- long count = buffer.getLong(baseOffset + LEAF_PARTITON_OFFSET_BYTES);
-
- // Skip leaves and collision offsets that do not belong to current token
- long offsetPos = buffer.position() + leafSize * LEAF_ENTRY_BYTES + collisionOffset * COLLISION_ENTRY_BYTES;
-
- for (int i = 0; i < count; i++)
- {
- long currentPartitionOffset = buffer.getLong(offsetPos + i * COLLISION_ENTRY_BYTES);
- long currentRowOffset = buffer.getLong(offsetPos + i * COLLISION_ENTRY_BYTES + LEAF_PARTITON_OFFSET_BYTES);
-
- rowOffsets.put(currentPartitionOffset, currentRowOffset);
- }
- break;
default:
throw new IllegalStateException("Unknown entry type: " + type);
}
-
-
- return rowOffsets;
}
}
- private static class KeyIterator extends AbstractIterator<RowKey>
+ private static class KeyIterator extends AbstractIterator<DecoratedKey>
{
- private final KeyFetcher keyFetcher;
- private final Iterator<LongObjectCursor<long[]>> offsets;
- private long currentPatitionKey;
- private PrimitiveIterator.OfLong currentCursor = null;
+ private final Function<Long, DecoratedKey> keyFetcher;
+ private final long[] offsets;
+ private int index = 0;
- public KeyIterator(KeyFetcher keyFetcher, KeyOffsets offsets)
+ public KeyIterator(Function<Long, DecoratedKey> keyFetcher, long[] offsets)
{
this.keyFetcher = keyFetcher;
- this.offsets = offsets.iterator();
+ this.offsets = offsets;
}
- public RowKey computeNext()
+ public DecoratedKey computeNext()
{
- if (currentCursor != null && currentCursor.hasNext())
- {
- return keyFetcher.getRowKey(currentPatitionKey, currentCursor.nextLong());
- }
- else if (offsets.hasNext())
- {
- LongObjectCursor<long[]> cursor = offsets.next();
- currentPatitionKey = cursor.key;
- currentCursor = LongStream.of(cursor.value).iterator();
-
- return keyFetcher.getRowKey(currentPatitionKey, currentCursor.nextLong());
- }
- else
- {
- return endOfData();
- }
+ return index < offsets.length ? keyFetcher.apply(offsets[index++]) : endOfData();
}
}
-}
+}
\ No newline at end of file