You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by xe...@apache.org on 2016/01/24 04:36:13 UTC
[11/14] cassandra git commit: Integrate SASI index into Cassandra
http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/conf/view/PrefixTermTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/conf/view/PrefixTermTree.java b/src/java/org/apache/cassandra/index/sasi/conf/view/PrefixTermTree.java
new file mode 100644
index 0000000..72b6daf
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/conf/view/PrefixTermTree.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.conf.view;
+
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.cassandra.index.sasi.SSTableIndex;
+import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder;
+import org.apache.cassandra.index.sasi.plan.Expression;
+import org.apache.cassandra.index.sasi.utils.trie.KeyAnalyzer;
+import org.apache.cassandra.index.sasi.utils.trie.PatriciaTrie;
+import org.apache.cassandra.index.sasi.utils.trie.Trie;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.utils.Interval;
+import org.apache.cassandra.utils.IntervalTree;
+
+import com.google.common.collect.Sets;
+
+/**
+ * This class is an extension over RangeTermTree for string terms,
+ * it is required because interval tree can't handle matching if search is on the
+ * prefix of min/max of the range, so for ascii/utf8 fields we build an additional
+ * prefix trie (including both min/max terms of the index) and do union of the results
+ * of the prefix tree search and results from the interval tree lookup.
+ */
+public class PrefixTermTree extends RangeTermTree
+{
+ private final OnDiskIndexBuilder.Mode mode;
+ private final Trie<ByteBuffer, Set<SSTableIndex>> trie;
+
+ public PrefixTermTree(ByteBuffer min, ByteBuffer max,
+ Trie<ByteBuffer, Set<SSTableIndex>> trie,
+ IntervalTree<ByteBuffer, SSTableIndex, Interval<ByteBuffer, SSTableIndex>> ranges,
+ OnDiskIndexBuilder.Mode mode)
+ {
+ super(min, max, ranges);
+
+ this.mode = mode;
+ this.trie = trie;
+ }
+
+ public Set<SSTableIndex> search(Expression e)
+ {
+ Map<ByteBuffer, Set<SSTableIndex>> indexes = (e == null || e.lower == null || mode == OnDiskIndexBuilder.Mode.CONTAINS)
+ ? trie : trie.prefixMap(e.lower.value);
+
+ Set<SSTableIndex> view = new HashSet<>(indexes.size());
+ indexes.values().forEach(view::addAll);
+
+ return Sets.union(view, super.search(e));
+ }
+
+ public static class Builder extends RangeTermTree.Builder
+ {
+ private final PatriciaTrie<ByteBuffer, Set<SSTableIndex>> trie;
+
+ protected Builder(OnDiskIndexBuilder.Mode mode, final AbstractType<?> comparator)
+ {
+ super(mode, comparator);
+ trie = new PatriciaTrie<>(new ByteBufferKeyAnalyzer(comparator));
+ }
+
+ public void addIndex(SSTableIndex index)
+ {
+ super.addIndex(index);
+ addTerm(index.minTerm(), index);
+ addTerm(index.maxTerm(), index);
+ }
+
+ public TermTree build()
+ {
+ return new PrefixTermTree(min, max, trie, IntervalTree.build(intervals), mode);
+ }
+
+ private void addTerm(ByteBuffer term, SSTableIndex index)
+ {
+ Set<SSTableIndex> indexes = trie.get(term);
+ if (indexes == null)
+ trie.put(term, (indexes = new HashSet<>()));
+
+ indexes.add(index);
+ }
+ }
+
+ private static class ByteBufferKeyAnalyzer implements KeyAnalyzer<ByteBuffer>
+ {
+ private final AbstractType<?> comparator;
+
+ public ByteBufferKeyAnalyzer(AbstractType<?> comparator)
+ {
+ this.comparator = comparator;
+ }
+
+ /**
+ * A bit mask where the first bit is 1 and the others are zero
+ */
+ private static final int MSB = 1 << Byte.SIZE-1;
+
+ public int compare(ByteBuffer a, ByteBuffer b)
+ {
+ return comparator.compare(a, b);
+ }
+
+ public int lengthInBits(ByteBuffer o)
+ {
+ return o.remaining() * Byte.SIZE;
+ }
+
+ public boolean isBitSet(ByteBuffer key, int bitIndex)
+ {
+ if (bitIndex >= lengthInBits(key))
+ return false;
+
+ int index = bitIndex / Byte.SIZE;
+ int bit = bitIndex % Byte.SIZE;
+ return (key.get(index) & mask(bit)) != 0;
+ }
+
+ public int bitIndex(ByteBuffer key, ByteBuffer otherKey)
+ {
+ int length = Math.max(key.remaining(), otherKey.remaining());
+
+ boolean allNull = true;
+ for (int i = 0; i < length; i++)
+ {
+ byte b1 = valueAt(key, i);
+ byte b2 = valueAt(otherKey, i);
+
+ if (b1 != b2)
+ {
+ int xor = b1 ^ b2;
+ for (int j = 0; j < Byte.SIZE; j++)
+ {
+ if ((xor & mask(j)) != 0)
+ return (i * Byte.SIZE) + j;
+ }
+ }
+
+ if (b1 != 0)
+ allNull = false;
+ }
+
+ return allNull ? KeyAnalyzer.NULL_BIT_KEY : KeyAnalyzer.EQUAL_BIT_KEY;
+ }
+
+ public boolean isPrefix(ByteBuffer key, ByteBuffer prefix)
+ {
+ if (key.remaining() < prefix.remaining())
+ return false;
+
+ for (int i = 0; i < prefix.remaining(); i++)
+ {
+ if (key.get(i) != prefix.get(i))
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * Returns the {@code byte} value at the given index.
+ */
+ private byte valueAt(ByteBuffer value, int index)
+ {
+ return index >= 0 && index < value.remaining() ? value.get(index) : 0;
+ }
+
+ /**
+ * Returns a bit mask where the given bit is set
+ */
+ private int mask(int bit)
+ {
+ return MSB >>> bit;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/conf/view/RangeTermTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/conf/view/RangeTermTree.java b/src/java/org/apache/cassandra/index/sasi/conf/view/RangeTermTree.java
new file mode 100644
index 0000000..62e5636
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/conf/view/RangeTermTree.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.conf.view;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.cassandra.index.sasi.SSTableIndex;
+import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder;
+import org.apache.cassandra.index.sasi.plan.Expression;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.utils.Interval;
+import org.apache.cassandra.utils.IntervalTree;
+
+public class RangeTermTree implements TermTree
+{
+ protected final ByteBuffer min, max;
+ protected final IntervalTree<ByteBuffer, SSTableIndex, Interval<ByteBuffer, SSTableIndex>> rangeTree;
+
+ public RangeTermTree(ByteBuffer min, ByteBuffer max, IntervalTree<ByteBuffer, SSTableIndex, Interval<ByteBuffer, SSTableIndex>> rangeTree)
+ {
+ this.min = min;
+ this.max = max;
+ this.rangeTree = rangeTree;
+ }
+
+ public Set<SSTableIndex> search(Expression e)
+ {
+ ByteBuffer minTerm = e.lower == null ? min : e.lower.value;
+ ByteBuffer maxTerm = e.upper == null ? max : e.upper.value;
+
+ return new HashSet<>(rangeTree.search(Interval.create(minTerm, maxTerm, (SSTableIndex) null)));
+ }
+
+ public int intervalCount()
+ {
+ return rangeTree.intervalCount();
+ }
+
+ static class Builder extends TermTree.Builder
+ {
+ protected final List<Interval<ByteBuffer, SSTableIndex>> intervals = new ArrayList<>();
+
+ protected Builder(OnDiskIndexBuilder.Mode mode, AbstractType<?> comparator)
+ {
+ super(mode, comparator);
+ }
+
+ public void addIndex(SSTableIndex index)
+ {
+ intervals.add(Interval.create(index.minTerm(), index.maxTerm(), index));
+ }
+
+ public TermTree build()
+ {
+ return new RangeTermTree(min, max, IntervalTree.build(intervals));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/conf/view/TermTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/conf/view/TermTree.java b/src/java/org/apache/cassandra/index/sasi/conf/view/TermTree.java
new file mode 100644
index 0000000..a175e22
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/conf/view/TermTree.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.conf.view;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+import org.apache.cassandra.index.sasi.SSTableIndex;
+import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder;
+import org.apache.cassandra.index.sasi.plan.Expression;
+import org.apache.cassandra.db.marshal.AbstractType;
+
+public interface TermTree
+{
+ Set<SSTableIndex> search(Expression e);
+
+ int intervalCount();
+
+ abstract class Builder
+ {
+ protected final OnDiskIndexBuilder.Mode mode;
+ protected final AbstractType<?> comparator;
+ protected ByteBuffer min, max;
+
+ protected Builder(OnDiskIndexBuilder.Mode mode, AbstractType<?> comparator)
+ {
+ this.mode = mode;
+ this.comparator = comparator;
+ }
+
+ public final void add(SSTableIndex index)
+ {
+ addIndex(index);
+
+ min = min == null || comparator.compare(min, index.minTerm()) > 0 ? index.minTerm() : min;
+ max = max == null || comparator.compare(max, index.maxTerm()) < 0 ? index.maxTerm() : max;
+ }
+
+ protected abstract void addIndex(SSTableIndex index);
+
+ public abstract TermTree build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/conf/view/View.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/conf/view/View.java b/src/java/org/apache/cassandra/index/sasi/conf/view/View.java
new file mode 100644
index 0000000..378c3c6
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/conf/view/View.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.conf.view;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.index.sasi.SSTableIndex;
+import org.apache.cassandra.index.sasi.conf.ColumnIndex;
+import org.apache.cassandra.index.sasi.plan.Expression;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.AsciiType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.utils.Interval;
+import org.apache.cassandra.utils.IntervalTree;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+
+public class View implements Iterable<SSTableIndex>
+{
+ private final Map<Descriptor, SSTableIndex> view;
+
+ private final TermTree termTree;
+ private final IntervalTree<ByteBuffer, SSTableIndex, Interval<ByteBuffer, SSTableIndex>> keyIntervalTree;
+
+ public View(ColumnIndex index, Set<SSTableIndex> indexes)
+ {
+ this(index, Collections.<SSTableIndex>emptyList(), Collections.<SSTableReader>emptyList(), indexes);
+ }
+
+ public View(ColumnIndex index,
+ Collection<SSTableIndex> currentView,
+ Collection<SSTableReader> oldSSTables,
+ Set<SSTableIndex> newIndexes)
+ {
+ Map<Descriptor, SSTableIndex> newView = new HashMap<>();
+
+ AbstractType<?> validator = index.getValidator();
+ TermTree.Builder termTreeBuilder = (validator instanceof AsciiType || validator instanceof UTF8Type)
+ ? new PrefixTermTree.Builder(index.getMode().mode, validator)
+ : new RangeTermTree.Builder(index.getMode().mode, validator);
+
+ List<Interval<ByteBuffer, SSTableIndex>> keyIntervals = new ArrayList<>();
+ for (SSTableIndex sstableIndex : Iterables.concat(currentView, newIndexes))
+ {
+ SSTableReader sstable = sstableIndex.getSSTable();
+ if (oldSSTables.contains(sstable) || sstable.isMarkedCompacted() || newView.containsKey(sstable.descriptor))
+ {
+ sstableIndex.release();
+ continue;
+ }
+
+ newView.put(sstable.descriptor, sstableIndex);
+
+ termTreeBuilder.add(sstableIndex);
+ keyIntervals.add(Interval.create(sstableIndex.minKey(), sstableIndex.maxKey(), sstableIndex));
+ }
+
+ this.view = newView;
+ this.termTree = termTreeBuilder.build();
+ this.keyIntervalTree = IntervalTree.build(keyIntervals);
+
+ if (keyIntervalTree.intervalCount() != termTree.intervalCount())
+ throw new IllegalStateException(String.format("mismatched sizes for intervals tree for keys vs terms: %d != %d", keyIntervalTree.intervalCount(), termTree.intervalCount()));
+ }
+
+ public Set<SSTableIndex> match(final Set<SSTableReader> scope, Expression expression)
+ {
+ return Sets.filter(termTree.search(expression), index -> scope.contains(index.getSSTable()));
+ }
+
+ public List<SSTableIndex> match(ByteBuffer minKey, ByteBuffer maxKey)
+ {
+ return keyIntervalTree.search(Interval.create(minKey, maxKey, (SSTableIndex) null));
+ }
+
+ public Iterator<SSTableIndex> iterator()
+ {
+ return view.values().iterator();
+ }
+
+ public Collection<SSTableIndex> getIndexes()
+ {
+ return view.values();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/disk/Descriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/Descriptor.java b/src/java/org/apache/cassandra/index/sasi/disk/Descriptor.java
new file mode 100644
index 0000000..a719f50
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/disk/Descriptor.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.disk;
+
+/**
+ * Object descriptor for SSTableAttachedSecondaryIndex files. Similar to, and based upon, the sstable descriptor.
+ */
+public class Descriptor
+{
+ public static final String VERSION_AA = "aa";
+ public static final String VERSION_AB = "ab";
+ public static final String CURRENT_VERSION = VERSION_AB;
+ public static final Descriptor CURRENT = new Descriptor(CURRENT_VERSION);
+
+ public static class Version
+ {
+ public final String version;
+
+ public Version(String version)
+ {
+ this.version = version;
+ }
+
+ public String toString()
+ {
+ return version;
+ }
+ }
+
+ public final Version version;
+
+ public Descriptor(String v)
+ {
+ this.version = new Version(v);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/disk/OnDiskBlock.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/OnDiskBlock.java b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskBlock.java
new file mode 100644
index 0000000..32cda53
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskBlock.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.disk;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.index.sasi.Term;
+import org.apache.cassandra.index.sasi.utils.MappedBuffer;
+import org.apache.cassandra.db.marshal.AbstractType;
+
+public abstract class OnDiskBlock<T extends Term>
+{
+ public enum BlockType
+ {
+ POINTER, DATA
+ }
+
+ // this contains offsets of the terms and term data
+ protected final MappedBuffer blockIndex;
+ protected final int blockIndexSize;
+
+ protected final boolean hasCombinedIndex;
+ protected final TokenTree combinedIndex;
+
+ public OnDiskBlock(Descriptor descriptor, MappedBuffer block, BlockType blockType)
+ {
+ blockIndex = block;
+
+ if (blockType == BlockType.POINTER)
+ {
+ hasCombinedIndex = false;
+ combinedIndex = null;
+ blockIndexSize = block.getInt() << 1; // num terms * sizeof(short)
+ return;
+ }
+
+ long blockOffset = block.position();
+ int combinedIndexOffset = block.getInt(blockOffset + OnDiskIndexBuilder.BLOCK_SIZE);
+
+ hasCombinedIndex = (combinedIndexOffset >= 0);
+ long blockIndexOffset = blockOffset + OnDiskIndexBuilder.BLOCK_SIZE + 4 + combinedIndexOffset;
+
+ combinedIndex = hasCombinedIndex ? new TokenTree(descriptor, blockIndex.duplicate().position(blockIndexOffset)) : null;
+ blockIndexSize = block.getInt() * 2;
+ }
+
+ public SearchResult<T> search(AbstractType<?> comparator, ByteBuffer query)
+ {
+ int cmp = -1, start = 0, end = termCount() - 1, middle = 0;
+
+ T element = null;
+ while (start <= end)
+ {
+ middle = start + ((end - start) >> 1);
+ element = getTerm(middle);
+
+ cmp = element.compareTo(comparator, query);
+ if (cmp == 0)
+ return new SearchResult<>(element, cmp, middle);
+ else if (cmp < 0)
+ start = middle + 1;
+ else
+ end = middle - 1;
+ }
+
+ return new SearchResult<>(element, cmp, middle);
+ }
+
+ protected T getTerm(int index)
+ {
+ MappedBuffer dup = blockIndex.duplicate();
+ long startsAt = getTermPosition(index);
+ if (termCount() - 1 == index) // last element
+ dup.position(startsAt);
+ else
+ dup.position(startsAt).limit(getTermPosition(index + 1));
+
+ return cast(dup);
+ }
+
+ protected long getTermPosition(int idx)
+ {
+ return getTermPosition(blockIndex, idx, blockIndexSize);
+ }
+
+ protected int termCount()
+ {
+ return blockIndexSize >> 1;
+ }
+
+ protected abstract T cast(MappedBuffer data);
+
+ static long getTermPosition(MappedBuffer data, int idx, int indexSize)
+ {
+ idx <<= 1;
+ assert idx < indexSize;
+ return data.position() + indexSize + data.getShort(data.position() + idx);
+ }
+
+ public TokenTree getBlockIndex()
+ {
+ return combinedIndex;
+ }
+
+ public int minOffset(OnDiskIndex.IteratorOrder order)
+ {
+ return order == OnDiskIndex.IteratorOrder.DESC ? 0 : termCount() - 1;
+ }
+
+ public int maxOffset(OnDiskIndex.IteratorOrder order)
+ {
+ return minOffset(order) == 0 ? termCount() - 1 : 0;
+ }
+
+ public static class SearchResult<T>
+ {
+ public final T result;
+ public final int index, cmp;
+
+ public SearchResult(T result, int cmp, int index)
+ {
+ this.result = result;
+ this.index = index;
+ this.cmp = cmp;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndex.java b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndex.java
new file mode 100644
index 0000000..0f9e389
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndex.java
@@ -0,0 +1,773 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.disk;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.stream.Collectors;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.index.sasi.Term;
+import org.apache.cassandra.index.sasi.plan.Expression;
+import org.apache.cassandra.index.sasi.plan.Expression.Op;
+import org.apache.cassandra.index.sasi.utils.MappedBuffer;
+import org.apache.cassandra.index.sasi.utils.RangeUnionIterator;
+import org.apache.cassandra.index.sasi.utils.AbstractIterator;
+import org.apache.cassandra.index.sasi.utils.RangeIterator;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.util.ChannelProxy;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+
+import static org.apache.cassandra.index.sasi.disk.OnDiskBlock.SearchResult;
+
+public class OnDiskIndex implements Iterable<OnDiskIndex.DataTerm>, Closeable
+{
+ public enum IteratorOrder
+ {
+ DESC(1), ASC(-1);
+
+ public final int step;
+
+ IteratorOrder(int step)
+ {
+ this.step = step;
+ }
+
+ public int startAt(OnDiskBlock<DataTerm> block, Expression e)
+ {
+ switch (this)
+ {
+ case DESC:
+ return e.lower == null
+ ? 0
+ : startAt(block.search(e.validator, e.lower.value), e.lower.inclusive);
+
+ case ASC:
+ return e.upper == null
+ ? block.termCount() - 1
+ : startAt(block.search(e.validator, e.upper.value), e.upper.inclusive);
+
+ default:
+ throw new IllegalArgumentException("Unknown order: " + this);
+ }
+ }
+
+ public int startAt(SearchResult<DataTerm> found, boolean inclusive)
+ {
+ switch (this)
+ {
+ case DESC:
+ if (found.cmp < 0)
+ return found.index + 1;
+
+ return inclusive || found.cmp != 0 ? found.index : found.index + 1;
+
+ case ASC:
+ if (found.cmp < 0) // search term was bigger then whole data set
+ return found.index;
+ return inclusive && (found.cmp == 0 || found.cmp < 0) ? found.index : found.index - 1;
+
+ default:
+ throw new IllegalArgumentException("Unknown order: " + this);
+ }
+ }
+ }
+
+ public final Descriptor descriptor;
+ protected final OnDiskIndexBuilder.Mode mode;
+ protected final OnDiskIndexBuilder.TermSize termSize;
+
+ protected final AbstractType<?> comparator;
+ protected final MappedBuffer indexFile;
+ protected final long indexSize;
+
+ protected final Function<Long, DecoratedKey> keyFetcher;
+
+ protected final String indexPath;
+
+ protected final PointerLevel[] levels;
+ protected final DataLevel dataLevel;
+
+ protected final ByteBuffer minTerm, maxTerm, minKey, maxKey;
+
+ public OnDiskIndex(File index, AbstractType<?> cmp, Function<Long, DecoratedKey> keyReader)
+ {
+ keyFetcher = keyReader;
+
+ comparator = cmp;
+ indexPath = index.getAbsolutePath();
+
+ RandomAccessFile backingFile = null;
+ try
+ {
+ backingFile = new RandomAccessFile(index, "r");
+
+ descriptor = new Descriptor(backingFile.readUTF());
+
+ termSize = OnDiskIndexBuilder.TermSize.of(backingFile.readShort());
+
+ minTerm = ByteBufferUtil.readWithShortLength(backingFile);
+ maxTerm = ByteBufferUtil.readWithShortLength(backingFile);
+
+ minKey = ByteBufferUtil.readWithShortLength(backingFile);
+ maxKey = ByteBufferUtil.readWithShortLength(backingFile);
+
+ mode = OnDiskIndexBuilder.Mode.mode(backingFile.readUTF());
+
+ indexSize = backingFile.length();
+ indexFile = new MappedBuffer(new ChannelProxy(indexPath, backingFile.getChannel()));
+
+ // start of the levels
+ indexFile.position(indexFile.getLong(indexSize - 8));
+
+ int numLevels = indexFile.getInt();
+ levels = new PointerLevel[numLevels];
+ for (int i = 0; i < levels.length; i++)
+ {
+ int blockCount = indexFile.getInt();
+ levels[i] = new PointerLevel(indexFile.position(), blockCount);
+ indexFile.position(indexFile.position() + blockCount * 8);
+ }
+
+ int blockCount = indexFile.getInt();
+ dataLevel = new DataLevel(indexFile.position(), blockCount);
+ }
+ catch (IOException e)
+ {
+ throw new FSReadError(e, index);
+ }
+ finally
+ {
+ FileUtils.closeQuietly(backingFile);
+ }
+ }
+
+ public ByteBuffer minTerm()
+ {
+ return minTerm;
+ }
+
+ public ByteBuffer maxTerm()
+ {
+ return maxTerm;
+ }
+
+ public ByteBuffer minKey()
+ {
+ return minKey;
+ }
+
+ public ByteBuffer maxKey()
+ {
+ return maxKey;
+ }
+
+ public DataTerm min()
+ {
+ return dataLevel.getBlock(0).getTerm(0);
+ }
+
+ public DataTerm max()
+ {
+ DataBlock block = dataLevel.getBlock(dataLevel.blockCount - 1);
+ return block.getTerm(block.termCount() - 1);
+ }
+
+ /**
+ * Search for rows which match all of the terms inside the given expression in the index file.
+ *
+ * @param exp The expression to use for the query.
+ *
+ * @return Iterator which contains rows for all of the terms from the given range.
+ */
+ public RangeIterator<Long, Token> search(Expression exp)
+ {
+ // convert single NOT_EQ to range with exclusion
+ final Expression expression = (exp.getOp() != Op.NOT_EQ)
+ ? exp
+ : new Expression(exp).setOp(Op.RANGE)
+ .setLower(new Expression.Bound(minTerm, true))
+ .setUpper(new Expression.Bound(maxTerm, true))
+ .addExclusion(exp.lower.value);
+
+ List<ByteBuffer> exclusions = new ArrayList<>(expression.exclusions.size());
+
+ Iterables.addAll(exclusions, expression.exclusions.stream().filter(exclusion -> {
+ // accept only exclusions which are in the bounds of lower/upper
+ return !(expression.lower != null && comparator.compare(exclusion, expression.lower.value) < 0)
+ && !(expression.upper != null && comparator.compare(exclusion, expression.upper.value) > 0);
+ }).collect(Collectors.toList()));
+
+ Collections.sort(exclusions, comparator);
+
+ if (exclusions.size() == 0)
+ return searchRange(expression);
+
+ List<Expression> ranges = new ArrayList<>(exclusions.size());
+
+ // calculate range splits based on the sorted exclusions
+ Iterator<ByteBuffer> exclusionsIterator = exclusions.iterator();
+
+ Expression.Bound min = expression.lower, max = null;
+ while (exclusionsIterator.hasNext())
+ {
+ max = new Expression.Bound(exclusionsIterator.next(), false);
+ ranges.add(new Expression(expression).setOp(Op.RANGE).setLower(min).setUpper(max));
+ min = max;
+ }
+
+ assert max != null;
+ ranges.add(new Expression(expression).setOp(Op.RANGE).setLower(max).setUpper(expression.upper));
+
+ RangeUnionIterator.Builder<Long, Token> builder = RangeUnionIterator.builder();
+ for (Expression e : ranges)
+ {
+ RangeIterator<Long, Token> range = searchRange(e);
+ if (range != null)
+ builder.add(range);
+ }
+
+ return builder.build();
+ }
+
+ private RangeIterator<Long, Token> searchRange(Expression range)
+ {
+ Expression.Bound lower = range.lower;
+ Expression.Bound upper = range.upper;
+
+ int lowerBlock = lower == null ? 0 : getDataBlock(lower.value);
+ int upperBlock = upper == null
+ ? dataLevel.blockCount - 1
+ // optimization so we don't have to fetch upperBlock when query has lower == upper
+ : (lower != null && comparator.compare(lower.value, upper.value) == 0) ? lowerBlock : getDataBlock(upper.value);
+
+ return (mode != OnDiskIndexBuilder.Mode.SPARSE || lowerBlock == upperBlock || upperBlock - lowerBlock <= 1)
+ ? searchPoint(lowerBlock, range)
+ : searchRange(lowerBlock, lower, upperBlock, upper);
+ }
+
+ private RangeIterator<Long, Token> searchRange(int lowerBlock, Expression.Bound lower, int upperBlock, Expression.Bound upper)
+ {
+ // if lower is at the beginning of the block that means we can just do a single iterator per block
+ SearchResult<DataTerm> lowerPosition = (lower == null) ? null : searchIndex(lower.value, lowerBlock);
+ SearchResult<DataTerm> upperPosition = (upper == null) ? null : searchIndex(upper.value, upperBlock);
+
+ RangeUnionIterator.Builder<Long, Token> builder = RangeUnionIterator.builder();
+
+ // optimistically assume that first and last blocks are full block reads, saves at least 3 'else' conditions
+ int firstFullBlockIdx = lowerBlock, lastFullBlockIdx = upperBlock;
+
+ // 'lower' doesn't cover the whole block so we need to do a partial iteration
+ // Two reasons why that can happen:
+ // - 'lower' is not the first element of the block
+ // - 'lower' is first element but it's not inclusive in the query
+ if (lowerPosition != null && (lowerPosition.index > 0 || !lower.inclusive))
+ {
+ DataBlock block = dataLevel.getBlock(lowerBlock);
+ int start = (lower.inclusive || lowerPosition.cmp != 0) ? lowerPosition.index : lowerPosition.index + 1;
+
+ builder.add(block.getRange(start, block.termCount()));
+ firstFullBlockIdx = lowerBlock + 1;
+ }
+
+ if (upperPosition != null)
+ {
+ DataBlock block = dataLevel.getBlock(upperBlock);
+ int lastIndex = block.termCount() - 1;
+
+ // The save as with 'lower' but here we need to check if the upper is the last element of the block,
+ // which means that we only have to get individual results if:
+ // - if it *is not* the last element, or
+ // - it *is* but shouldn't be included (dictated by upperInclusive)
+ if (upperPosition.index != lastIndex || !upper.inclusive)
+ {
+ int end = (upperPosition.cmp < 0 || (upperPosition.cmp == 0 && upper.inclusive))
+ ? upperPosition.index + 1 : upperPosition.index;
+
+ builder.add(block.getRange(0, end));
+ lastFullBlockIdx = upperBlock - 1;
+ }
+ }
+
+ int totalSuperBlocks = (lastFullBlockIdx - firstFullBlockIdx) / OnDiskIndexBuilder.SUPER_BLOCK_SIZE;
+
+ // if there are no super-blocks, we can simply read all of the block iterators in sequence
+ if (totalSuperBlocks == 0)
+ {
+ for (int i = firstFullBlockIdx; i <= lastFullBlockIdx; i++)
+ builder.add(dataLevel.getBlock(i).getBlockIndex().iterator(keyFetcher));
+
+ return builder.build();
+ }
+
+ // first get all of the blocks which are aligned before the first super-block in the sequence,
+ // e.g. if the block range was (1, 9) and super-block-size = 4, we need to read 1, 2, 3, 4 - 7 is covered by
+ // super-block, 8, 9 is a remainder.
+
+ int superBlockAlignedStart = firstFullBlockIdx == 0 ? 0 : (int) FBUtilities.align(firstFullBlockIdx, OnDiskIndexBuilder.SUPER_BLOCK_SIZE);
+ for (int blockIdx = firstFullBlockIdx; blockIdx < Math.min(superBlockAlignedStart, lastFullBlockIdx); blockIdx++)
+ builder.add(getBlockIterator(blockIdx));
+
+ // now read all of the super-blocks matched by the request, from the previous comment
+ // it's a block with index 1 (which covers everything from 4 to 7)
+
+ int superBlockIdx = superBlockAlignedStart / OnDiskIndexBuilder.SUPER_BLOCK_SIZE;
+ for (int offset = 0; offset < totalSuperBlocks - 1; offset++)
+ builder.add(dataLevel.getSuperBlock(superBlockIdx++).iterator());
+
+ // now it's time for a remainder read, again from the previous example it's 8, 9 because
+ // we have over-shot previous block but didn't request enough to cover next super-block.
+
+ int lastCoveredBlock = superBlockIdx * OnDiskIndexBuilder.SUPER_BLOCK_SIZE;
+ for (int offset = 0; offset <= (lastFullBlockIdx - lastCoveredBlock); offset++)
+ builder.add(getBlockIterator(lastCoveredBlock + offset));
+
+ return builder.build();
+ }
+
+ private RangeIterator<Long, Token> searchPoint(int lowerBlock, Expression expression)
+ {
+ Iterator<DataTerm> terms = new TermIterator(lowerBlock, expression, IteratorOrder.DESC);
+ RangeUnionIterator.Builder<Long, Token> builder = RangeUnionIterator.builder();
+
+ while (terms.hasNext())
+ {
+ try
+ {
+ builder.add(terms.next().getTokens());
+ }
+ finally
+ {
+ expression.checkpoint();
+ }
+ }
+
+ return builder.build();
+ }
+
+ private RangeIterator<Long, Token> getBlockIterator(int blockIdx)
+ {
+ DataBlock block = dataLevel.getBlock(blockIdx);
+ return (block.hasCombinedIndex)
+ ? block.getBlockIndex().iterator(keyFetcher)
+ : block.getRange(0, block.termCount());
+ }
+
+ public Iterator<DataTerm> iteratorAt(ByteBuffer query, IteratorOrder order, boolean inclusive)
+ {
+ Expression e = new Expression("", comparator);
+ Expression.Bound bound = new Expression.Bound(query, inclusive);
+
+ switch (order)
+ {
+ case DESC:
+ e.setLower(bound);
+ break;
+
+ case ASC:
+ e.setUpper(bound);
+ break;
+
+ default:
+ throw new IllegalArgumentException("Unknown order: " + order);
+ }
+
+ return new TermIterator(levels.length == 0 ? 0 : getBlockIdx(findPointer(query), query), e, order);
+ }
+
+ private int getDataBlock(ByteBuffer query)
+ {
+ return levels.length == 0 ? 0 : getBlockIdx(findPointer(query), query);
+ }
+
+ public Iterator<DataTerm> iterator()
+ {
+ return new TermIterator(0, new Expression("", comparator), IteratorOrder.DESC);
+ }
+
+ public void close() throws IOException
+ {
+ FileUtils.closeQuietly(indexFile);
+ }
+
+ private PointerTerm findPointer(ByteBuffer query)
+ {
+ PointerTerm ptr = null;
+ for (PointerLevel level : levels)
+ {
+ if ((ptr = level.getPointer(ptr, query)) == null)
+ return null;
+ }
+
+ return ptr;
+ }
+
+ private SearchResult<DataTerm> searchIndex(ByteBuffer query, int blockIdx)
+ {
+ return dataLevel.getBlock(blockIdx).search(comparator, query);
+ }
+
+ private int getBlockIdx(PointerTerm ptr, ByteBuffer query)
+ {
+ int blockIdx = 0;
+ if (ptr != null)
+ {
+ int cmp = ptr.compareTo(comparator, query);
+ blockIdx = (cmp == 0 || cmp > 0) ? ptr.getBlock() : ptr.getBlock() + 1;
+ }
+
+ return blockIdx;
+ }
+
+ protected class PointerLevel extends Level<PointerBlock>
+ {
+ public PointerLevel(long offset, int count)
+ {
+ super(offset, count);
+ }
+
+ public PointerTerm getPointer(PointerTerm parent, ByteBuffer query)
+ {
+ return getBlock(getBlockIdx(parent, query)).search(comparator, query).result;
+ }
+
+ protected PointerBlock cast(MappedBuffer block)
+ {
+ return new PointerBlock(block);
+ }
+ }
+
+ protected class DataLevel extends Level<DataBlock>
+ {
+ protected final int superBlockCnt;
+ protected final long superBlocksOffset;
+
+ public DataLevel(long offset, int count)
+ {
+ super(offset, count);
+ long baseOffset = blockOffsets + blockCount * 8;
+ superBlockCnt = indexFile.getInt(baseOffset);
+ superBlocksOffset = baseOffset + 4;
+ }
+
+ protected DataBlock cast(MappedBuffer block)
+ {
+ return new DataBlock(block);
+ }
+
+ public OnDiskSuperBlock getSuperBlock(int idx)
+ {
+ assert idx < superBlockCnt : String.format("requested index %d is greater than super block count %d", idx, superBlockCnt);
+ long blockOffset = indexFile.getLong(superBlocksOffset + idx * 8);
+ return new OnDiskSuperBlock(indexFile.duplicate().position(blockOffset));
+ }
+ }
+
+ protected class OnDiskSuperBlock
+ {
+ private final TokenTree tokenTree;
+
+ public OnDiskSuperBlock(MappedBuffer buffer)
+ {
+ tokenTree = new TokenTree(descriptor, buffer);
+ }
+
+ public RangeIterator<Long, Token> iterator()
+ {
+ return tokenTree.iterator(keyFetcher);
+ }
+ }
+
+ protected abstract class Level<T extends OnDiskBlock>
+ {
+ protected final long blockOffsets;
+ protected final int blockCount;
+
+ public Level(long offsets, int count)
+ {
+ this.blockOffsets = offsets;
+ this.blockCount = count;
+ }
+
+ public T getBlock(int idx) throws FSReadError
+ {
+ assert idx >= 0 && idx < blockCount;
+
+ // calculate block offset and move there
+ // (long is intentional, we'll just need mmap implementation which supports long positions)
+ long blockOffset = indexFile.getLong(blockOffsets + idx * 8);
+ return cast(indexFile.duplicate().position(blockOffset));
+ }
+
+ protected abstract T cast(MappedBuffer block);
+ }
+
+ protected class DataBlock extends OnDiskBlock<DataTerm>
+ {
+ public DataBlock(MappedBuffer data)
+ {
+ super(descriptor, data, BlockType.DATA);
+ }
+
+ protected DataTerm cast(MappedBuffer data)
+ {
+ return new DataTerm(data, termSize, getBlockIndex());
+ }
+
+ public RangeIterator<Long, Token> getRange(int start, int end)
+ {
+ RangeUnionIterator.Builder<Long, Token> builder = RangeUnionIterator.builder();
+ NavigableMap<Long, Token> sparse = new TreeMap<>();
+
+ for (int i = start; i < end; i++)
+ {
+ DataTerm term = getTerm(i);
+
+ if (term.isSparse())
+ {
+ NavigableMap<Long, Token> tokens = term.getSparseTokens();
+ for (Map.Entry<Long, Token> t : tokens.entrySet())
+ {
+ Token token = sparse.get(t.getKey());
+ if (token == null)
+ sparse.put(t.getKey(), t.getValue());
+ else
+ token.merge(t.getValue());
+ }
+ }
+ else
+ {
+ builder.add(term.getTokens());
+ }
+ }
+
+ PrefetchedTokensIterator prefetched = sparse.isEmpty() ? null : new PrefetchedTokensIterator(sparse);
+
+ if (builder.rangeCount() == 0)
+ return prefetched;
+
+ builder.add(prefetched);
+ return builder.build();
+ }
+ }
+
+ protected class PointerBlock extends OnDiskBlock<PointerTerm>
+ {
+ public PointerBlock(MappedBuffer block)
+ {
+ super(descriptor, block, BlockType.POINTER);
+ }
+
+ protected PointerTerm cast(MappedBuffer data)
+ {
+ return new PointerTerm(data, termSize);
+ }
+ }
+
+ public class DataTerm extends Term implements Comparable<DataTerm>
+ {
+ private final TokenTree perBlockIndex;
+
+ protected DataTerm(MappedBuffer content, OnDiskIndexBuilder.TermSize size, TokenTree perBlockIndex)
+ {
+ super(content, size);
+ this.perBlockIndex = perBlockIndex;
+ }
+
+ public RangeIterator<Long, Token> getTokens()
+ {
+ final long blockEnd = FBUtilities.align(content.position(), OnDiskIndexBuilder.BLOCK_SIZE);
+
+ if (isSparse())
+ return new PrefetchedTokensIterator(getSparseTokens());
+
+ long offset = blockEnd + 4 + content.getInt(getDataOffset() + 1);
+ return new TokenTree(descriptor, indexFile.duplicate().position(offset)).iterator(keyFetcher);
+ }
+
+ public boolean isSparse()
+ {
+ return content.get(getDataOffset()) > 0;
+ }
+
+ public NavigableMap<Long, Token> getSparseTokens()
+ {
+ long ptrOffset = getDataOffset();
+
+ byte size = content.get(ptrOffset);
+
+ assert size > 0;
+
+ NavigableMap<Long, Token> individualTokens = new TreeMap<>();
+ for (int i = 0; i < size; i++)
+ {
+ Token token = perBlockIndex.get(content.getLong(ptrOffset + 1 + (8 * i)), keyFetcher);
+
+ assert token != null;
+ individualTokens.put(token.get(), token);
+ }
+
+ return individualTokens;
+ }
+
+ public int compareTo(DataTerm other)
+ {
+ return other == null ? 1 : compareTo(comparator, other.getTerm());
+ }
+ }
+
+ protected static class PointerTerm extends Term
+ {
+ public PointerTerm(MappedBuffer content, OnDiskIndexBuilder.TermSize size)
+ {
+ super(content, size);
+ }
+
+ public int getBlock()
+ {
+ return content.getInt(getDataOffset());
+ }
+ }
+
+ private static class PrefetchedTokensIterator extends RangeIterator<Long, Token>
+ {
+ private final NavigableMap<Long, Token> tokens;
+ private PeekingIterator<Token> currentIterator;
+
+ public PrefetchedTokensIterator(NavigableMap<Long, Token> tokens)
+ {
+ super(tokens.firstKey(), tokens.lastKey(), tokens.size());
+ this.tokens = tokens;
+ this.currentIterator = Iterators.peekingIterator(tokens.values().iterator());
+ }
+
+ protected Token computeNext()
+ {
+ return currentIterator != null && currentIterator.hasNext()
+ ? currentIterator.next()
+ : endOfData();
+ }
+
+ protected void performSkipTo(Long nextToken)
+ {
+ currentIterator = Iterators.peekingIterator(tokens.tailMap(nextToken, true).values().iterator());
+ }
+
+ public void close() throws IOException
+ {
+ endOfData();
+ }
+ }
+
+ public AbstractType<?> getComparator()
+ {
+ return comparator;
+ }
+
+ public String getIndexPath()
+ {
+ return indexPath;
+ }
+
+ private class TermIterator extends AbstractIterator<DataTerm>
+ {
+ private final Expression e;
+ private final IteratorOrder order;
+
+ protected OnDiskBlock<DataTerm> currentBlock;
+ protected int blockIndex, offset;
+
+ private boolean checkLower = true, checkUpper = true;
+
+ public TermIterator(int startBlock, Expression expression, IteratorOrder order)
+ {
+ this.e = expression;
+ this.order = order;
+ this.blockIndex = startBlock;
+
+ nextBlock();
+ }
+
+ protected DataTerm computeNext()
+ {
+ for (;;)
+ {
+ if (currentBlock == null)
+ return endOfData();
+
+ if (offset >= 0 && offset < currentBlock.termCount())
+ {
+ DataTerm currentTerm = currentBlock.getTerm(nextOffset());
+
+ if (checkLower && !e.isLowerSatisfiedBy(currentTerm))
+ continue;
+
+ // flip the flag right on the first bounds match
+ // to avoid expensive comparisons
+ checkLower = false;
+
+ if (checkUpper && !e.isUpperSatisfiedBy(currentTerm))
+ return endOfData();
+
+ return currentTerm;
+ }
+
+ nextBlock();
+ }
+ }
+
+ protected void nextBlock()
+ {
+ currentBlock = null;
+
+ if (blockIndex < 0 || blockIndex >= dataLevel.blockCount)
+ return;
+
+ currentBlock = dataLevel.getBlock(nextBlockIndex());
+ offset = checkLower ? order.startAt(currentBlock, e) : currentBlock.minOffset(order);
+
+ // let's check the last term of the new block right away
+ // if expression's upper bound is satisfied by it such means that we can avoid
+ // doing any expensive upper bound checks for that block.
+ checkUpper = e.hasUpper() && !e.isUpperSatisfiedBy(currentBlock.getTerm(currentBlock.maxOffset(order)));
+ }
+
+ protected int nextBlockIndex()
+ {
+ int current = blockIndex;
+ blockIndex += order.step;
+ return current;
+ }
+
+ protected int nextOffset()
+ {
+ int current = offset;
+ offset += order.step;
+ return current;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java
new file mode 100644
index 0000000..7b8f5c9
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/disk/OnDiskIndexBuilder.java
@@ -0,0 +1,627 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.disk;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.index.sasi.sa.IntegralSA;
+import org.apache.cassandra.index.sasi.sa.SA;
+import org.apache.cassandra.index.sasi.sa.TermIterator;
+import org.apache.cassandra.index.sasi.sa.SuffixSA;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.io.util.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+
+import com.carrotsearch.hppc.LongArrayList;
+import com.carrotsearch.hppc.LongSet;
+import com.carrotsearch.hppc.ShortArrayList;
+import com.google.common.annotations.VisibleForTesting;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OnDiskIndexBuilder
+{
+ private static final Logger logger = LoggerFactory.getLogger(OnDiskIndexBuilder.class);
+
+ public enum Mode
+ {
+ PREFIX, CONTAINS, SPARSE;
+
+ public static Mode mode(String mode)
+ {
+ return Mode.valueOf(mode.toUpperCase());
+ }
+ }
+
+ public enum TermSize
+ {
+ INT(4), LONG(8), UUID(16), VARIABLE(-1);
+
+ public final int size;
+
+ TermSize(int size)
+ {
+ this.size = size;
+ }
+
+ public boolean isConstant()
+ {
+ return this != VARIABLE;
+ }
+
+ public static TermSize of(int size)
+ {
+ switch (size)
+ {
+ case -1:
+ return VARIABLE;
+
+ case 4:
+ return INT;
+
+ case 8:
+ return LONG;
+
+ case 16:
+ return UUID;
+
+ default:
+ throw new IllegalStateException("unknown state: " + size);
+ }
+ }
+
+ public static TermSize sizeOf(AbstractType<?> comparator)
+ {
+ if (comparator instanceof Int32Type || comparator instanceof FloatType)
+ return INT;
+
+ if (comparator instanceof LongType || comparator instanceof DoubleType
+ || comparator instanceof TimestampType || comparator instanceof DateType)
+ return LONG;
+
+ if (comparator instanceof TimeUUIDType || comparator instanceof UUIDType)
+ return UUID;
+
+ return VARIABLE;
+ }
+ }
+
+ public static final int BLOCK_SIZE = 4096;
+ public static final int MAX_TERM_SIZE = 1024;
+ public static final int SUPER_BLOCK_SIZE = 64;
+
+ private final List<MutableLevel<InMemoryPointerTerm>> levels = new ArrayList<>();
+ private MutableLevel<InMemoryDataTerm> dataLevel;
+
+ private final TermSize termSize;
+
+ private final AbstractType<?> keyComparator, termComparator;
+
+ private final Map<ByteBuffer, TokenTreeBuilder> terms;
+ private final Mode mode;
+
+ private ByteBuffer minKey, maxKey;
+ private long estimatedBytes;
+
+ public OnDiskIndexBuilder(AbstractType<?> keyComparator, AbstractType<?> comparator, Mode mode)
+ {
+ this.keyComparator = keyComparator;
+ this.termComparator = comparator;
+ this.terms = new HashMap<>();
+ this.termSize = TermSize.sizeOf(comparator);
+ this.mode = mode;
+ }
+
+ public OnDiskIndexBuilder add(ByteBuffer term, DecoratedKey key, long keyPosition)
+ {
+ if (term.remaining() >= MAX_TERM_SIZE)
+ {
+ logger.error("Rejecting value (value size {}, maximum size {} bytes).", term.remaining(), Short.MAX_VALUE);
+ return this;
+ }
+
+ TokenTreeBuilder tokens = terms.get(term);
+ if (tokens == null)
+ {
+ terms.put(term, (tokens = new TokenTreeBuilder()));
+
+ // on-heap size estimates from jol
+ // 64 bytes for TTB + 48 bytes for TreeMap in TTB + size bytes for the term (map key)
+ estimatedBytes += 64 + 48 + term.remaining();
+ }
+
+ tokens.add((Long) key.getToken().getTokenValue(), keyPosition);
+
+ // calculate key range (based on actual key values) for current index
+ minKey = (minKey == null || keyComparator.compare(minKey, key.getKey()) > 0) ? key.getKey() : minKey;
+ maxKey = (maxKey == null || keyComparator.compare(maxKey, key.getKey()) < 0) ? key.getKey() : maxKey;
+
+ // 60 ((boolean(1)*4) + (long(8)*4) + 24) bytes for the LongOpenHashSet created when the keyPosition was added
+ // + 40 bytes for the TreeMap.Entry + 8 bytes for the token (key).
+ // in the case of hash collision for the token we may overestimate but this is extremely rare
+ estimatedBytes += 60 + 40 + 8;
+
+ return this;
+ }
+
+ public long estimatedMemoryUse()
+ {
+ return estimatedBytes;
+ }
+
+ private void addTerm(InMemoryDataTerm term, SequentialWriter out) throws IOException
+ {
+ InMemoryPointerTerm ptr = dataLevel.add(term);
+ if (ptr == null)
+ return;
+
+ int levelIdx = 0;
+ for (;;)
+ {
+ MutableLevel<InMemoryPointerTerm> level = getIndexLevel(levelIdx++, out);
+ if ((ptr = level.add(ptr)) == null)
+ break;
+ }
+ }
+
+ public boolean isEmpty()
+ {
+ return terms.isEmpty();
+ }
+
+ public void finish(Pair<ByteBuffer, ByteBuffer> range, File file, TermIterator terms)
+ {
+ finish(Descriptor.CURRENT, range, file, terms);
+ }
+
+ /**
+ * Finishes up index building process by creating/populating index file.
+ *
+ * @param indexFile The file to write index contents to.
+ *
+ * @return true if index was written successfully, false otherwise (e.g. if index was empty).
+ *
+ * @throws FSWriteError on I/O error.
+ */
+ public boolean finish(File indexFile) throws FSWriteError
+ {
+ return finish(Descriptor.CURRENT, indexFile);
+ }
+
+ @VisibleForTesting
+ protected boolean finish(Descriptor descriptor, File file) throws FSWriteError
+ {
+ // no terms means there is nothing to build
+ if (terms.isEmpty())
+ return false;
+
+ // split terms into suffixes only if it's text, otherwise (even if CONTAINS is set) use terms in original form
+ SA sa = ((termComparator instanceof UTF8Type || termComparator instanceof AsciiType) && mode == Mode.CONTAINS)
+ ? new SuffixSA(termComparator, mode) : new IntegralSA(termComparator, mode);
+
+ for (Map.Entry<ByteBuffer, TokenTreeBuilder> term : terms.entrySet())
+ sa.add(term.getKey(), term.getValue());
+
+ finish(descriptor, Pair.create(minKey, maxKey), file, sa.finish());
+ return true;
+ }
+
+ protected void finish(Descriptor descriptor, Pair<ByteBuffer, ByteBuffer> range, File file, TermIterator terms)
+ {
+ SequentialWriter out = null;
+
+ try
+ {
+ out = new SequentialWriter(file, BLOCK_SIZE, BufferType.ON_HEAP);
+
+ out.writeUTF(descriptor.version.toString());
+
+ out.writeShort(termSize.size);
+
+ // min, max term (useful to find initial scan range from search expressions)
+ ByteBufferUtil.writeWithShortLength(terms.minTerm(), out);
+ ByteBufferUtil.writeWithShortLength(terms.maxTerm(), out);
+
+ // min, max keys covered by index (useful when searching across multiple indexes)
+ ByteBufferUtil.writeWithShortLength(range.left, out);
+ ByteBufferUtil.writeWithShortLength(range.right, out);
+
+ out.writeUTF(mode.toString());
+
+ out.skipBytes((int) (BLOCK_SIZE - out.position()));
+
+ dataLevel = mode == Mode.SPARSE ? new DataBuilderLevel(out, new MutableDataBlock(mode))
+ : new MutableLevel<>(out, new MutableDataBlock(mode));
+ while (terms.hasNext())
+ {
+ Pair<ByteBuffer, TokenTreeBuilder> term = terms.next();
+ addTerm(new InMemoryDataTerm(term.left, term.right), out);
+ }
+
+ dataLevel.finalFlush();
+ for (MutableLevel l : levels)
+ l.flush(); // flush all of the buffers
+
+ // and finally write levels index
+ final long levelIndexPosition = out.position();
+
+ out.writeInt(levels.size());
+ for (int i = levels.size() - 1; i >= 0; i--)
+ levels.get(i).flushMetadata();
+
+ dataLevel.flushMetadata();
+
+ out.writeLong(levelIndexPosition);
+
+ // sync contents of the output and disk,
+ // since it's not done implicitly on close
+ out.sync();
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, file);
+ }
+ finally
+ {
+ FileUtils.closeQuietly(out);
+ }
+ }
+
+ private MutableLevel<InMemoryPointerTerm> getIndexLevel(int idx, SequentialWriter out)
+ {
+ if (levels.size() == 0)
+ levels.add(new MutableLevel<>(out, new MutableBlock<>()));
+
+ if (levels.size() - 1 < idx)
+ {
+ int toAdd = idx - (levels.size() - 1);
+ for (int i = 0; i < toAdd; i++)
+ levels.add(new MutableLevel<>(out, new MutableBlock<>()));
+ }
+
+ return levels.get(idx);
+ }
+
+ protected static void alignToBlock(SequentialWriter out) throws IOException
+ {
+ long endOfBlock = out.position();
+ if ((endOfBlock & (BLOCK_SIZE - 1)) != 0) // align on the block boundary if needed
+ out.skipBytes((int) (FBUtilities.align(endOfBlock, BLOCK_SIZE) - endOfBlock));
+ }
+
+ private class InMemoryTerm
+ {
+ protected final ByteBuffer term;
+
+ public InMemoryTerm(ByteBuffer term)
+ {
+ this.term = term;
+ }
+
+ public int serializedSize()
+ {
+ return (termSize.isConstant() ? 0 : 2) + term.remaining();
+ }
+
+ public void serialize(DataOutputPlus out) throws IOException
+ {
+ if (termSize.isConstant())
+ out.write(term);
+ else
+ ByteBufferUtil.writeWithShortLength(term, out);
+ }
+ }
+
+ private class InMemoryPointerTerm extends InMemoryTerm
+ {
+ protected final int blockCnt;
+
+ public InMemoryPointerTerm(ByteBuffer term, int blockCnt)
+ {
+ super(term);
+ this.blockCnt = blockCnt;
+ }
+
+ public int serializedSize()
+ {
+ return super.serializedSize() + 4;
+ }
+
+ public void serialize(DataOutputPlus out) throws IOException
+ {
+ super.serialize(out);
+ out.writeInt(blockCnt);
+ }
+ }
+
+ private class InMemoryDataTerm extends InMemoryTerm
+ {
+ private final TokenTreeBuilder keys;
+
+ public InMemoryDataTerm(ByteBuffer term, TokenTreeBuilder keys)
+ {
+ super(term);
+ this.keys = keys;
+ }
+ }
+
+ private class MutableLevel<T extends InMemoryTerm>
+ {
+ private final LongArrayList blockOffsets = new LongArrayList();
+
+ protected final SequentialWriter out;
+
+ private final MutableBlock<T> inProcessBlock;
+ private InMemoryPointerTerm lastTerm;
+
+ public MutableLevel(SequentialWriter out, MutableBlock<T> block)
+ {
+ this.out = out;
+ this.inProcessBlock = block;
+ }
+
+ /**
+ * @return If we flushed a block, return the last term of that block; else, null.
+ */
+ public InMemoryPointerTerm add(T term) throws IOException
+ {
+ InMemoryPointerTerm toPromote = null;
+
+ if (!inProcessBlock.hasSpaceFor(term))
+ {
+ flush();
+ toPromote = lastTerm;
+ }
+
+ inProcessBlock.add(term);
+
+ lastTerm = new InMemoryPointerTerm(term.term, blockOffsets.size());
+ return toPromote;
+ }
+
+ public void flush() throws IOException
+ {
+ blockOffsets.add(out.position());
+ inProcessBlock.flushAndClear(out);
+ }
+
+ public void finalFlush() throws IOException
+ {
+ flush();
+ }
+
+ public void flushMetadata() throws IOException
+ {
+ flushMetadata(blockOffsets);
+ }
+
+ protected void flushMetadata(LongArrayList longArrayList) throws IOException
+ {
+ out.writeInt(longArrayList.size());
+ for (int i = 0; i < longArrayList.size(); i++)
+ out.writeLong(longArrayList.get(i));
+ }
+ }
+
+ /** builds standard data blocks and super blocks, as well */
+ private class DataBuilderLevel extends MutableLevel<InMemoryDataTerm>
+ {
+ private final LongArrayList superBlockOffsets = new LongArrayList();
+
+ /** count of regular data blocks written since current super block was init'd */
+ private int dataBlocksCnt;
+ private TokenTreeBuilder superBlockTree;
+
+ public DataBuilderLevel(SequentialWriter out, MutableBlock<InMemoryDataTerm> block)
+ {
+ super(out, block);
+ superBlockTree = new TokenTreeBuilder();
+ }
+
+ public InMemoryPointerTerm add(InMemoryDataTerm term) throws IOException
+ {
+ InMemoryPointerTerm ptr = super.add(term);
+ if (ptr != null)
+ {
+ dataBlocksCnt++;
+ flushSuperBlock(false);
+ }
+ superBlockTree.add(term.keys.getTokens());
+ return ptr;
+ }
+
+ public void flushSuperBlock(boolean force) throws IOException
+ {
+ if (dataBlocksCnt == SUPER_BLOCK_SIZE || (force && !superBlockTree.getTokens().isEmpty()))
+ {
+ superBlockOffsets.add(out.position());
+ superBlockTree.finish().write(out);
+ alignToBlock(out);
+
+ dataBlocksCnt = 0;
+ superBlockTree = new TokenTreeBuilder();
+ }
+ }
+
+ public void finalFlush() throws IOException
+ {
+ super.flush();
+ flushSuperBlock(true);
+ }
+
+ public void flushMetadata() throws IOException
+ {
+ super.flushMetadata();
+ flushMetadata(superBlockOffsets);
+ }
+ }
+
+ private static class MutableBlock<T extends InMemoryTerm>
+ {
+ protected final DataOutputBufferFixed buffer;
+ protected final ShortArrayList offsets;
+
+ public MutableBlock()
+ {
+ buffer = new DataOutputBufferFixed(BLOCK_SIZE);
+ offsets = new ShortArrayList();
+ }
+
+ public final void add(T term) throws IOException
+ {
+ offsets.add((short) buffer.position());
+ addInternal(term);
+ }
+
+ protected void addInternal(T term) throws IOException
+ {
+ term.serialize(buffer);
+ }
+
+ public boolean hasSpaceFor(T element)
+ {
+ return sizeAfter(element) < BLOCK_SIZE;
+ }
+
+ protected int sizeAfter(T element)
+ {
+ return getWatermark() + 4 + element.serializedSize();
+ }
+
+ protected int getWatermark()
+ {
+ return 4 + offsets.size() * 2 + (int) buffer.position();
+ }
+
+ public void flushAndClear(SequentialWriter out) throws IOException
+ {
+ out.writeInt(offsets.size());
+ for (int i = 0; i < offsets.size(); i++)
+ out.writeShort(offsets.get(i));
+
+ out.write(buffer.buffer());
+
+ alignToBlock(out);
+
+ offsets.clear();
+ buffer.clear();
+ }
+ }
+
+ private static class MutableDataBlock extends MutableBlock<InMemoryDataTerm>
+ {
+ private final Mode mode;
+
+ private int offset = 0;
+ private int sparseValueTerms = 0;
+
+ private final List<TokenTreeBuilder> containers = new ArrayList<>();
+ private TokenTreeBuilder combinedIndex;
+
+ public MutableDataBlock(Mode mode)
+ {
+ this.mode = mode;
+ this.combinedIndex = new TokenTreeBuilder();
+ }
+
+ protected void addInternal(InMemoryDataTerm term) throws IOException
+ {
+ TokenTreeBuilder keys = term.keys;
+
+ if (mode == Mode.SPARSE && keys.getTokenCount() <= 5)
+ {
+ writeTerm(term, keys);
+ sparseValueTerms++;
+ }
+ else
+ {
+ writeTerm(term, offset);
+
+ offset += keys.serializedSize();
+ containers.add(keys);
+ }
+
+ if (mode == Mode.SPARSE)
+ combinedIndex.add(keys.getTokens());
+ }
+
+ protected int sizeAfter(InMemoryDataTerm element)
+ {
+ return super.sizeAfter(element) + ptrLength(element);
+ }
+
+ public void flushAndClear(SequentialWriter out) throws IOException
+ {
+ super.flushAndClear(out);
+
+ out.writeInt((sparseValueTerms == 0) ? -1 : offset);
+
+ if (containers.size() > 0)
+ {
+ for (TokenTreeBuilder tokens : containers)
+ tokens.write(out);
+ }
+
+ if (sparseValueTerms > 0)
+ {
+ combinedIndex.finish().write(out);
+ }
+
+ alignToBlock(out);
+
+ containers.clear();
+ combinedIndex = new TokenTreeBuilder();
+
+ offset = 0;
+ sparseValueTerms = 0;
+ }
+
+ private int ptrLength(InMemoryDataTerm term)
+ {
+ return (term.keys.getTokenCount() > 5)
+ ? 5 // 1 byte type + 4 byte offset to the tree
+ : 1 + (8 * (int) term.keys.getTokenCount()); // 1 byte size + n 8 byte tokens
+ }
+
+ private void writeTerm(InMemoryTerm term, TokenTreeBuilder keys) throws IOException
+ {
+ term.serialize(buffer);
+ buffer.writeByte((byte) keys.getTokenCount());
+
+ Iterator<Pair<Long, LongSet>> tokens = keys.iterator();
+ while (tokens.hasNext())
+ buffer.writeLong(tokens.next().left);
+ }
+
+ private void writeTerm(InMemoryTerm term, int offset) throws IOException
+ {
+ term.serialize(buffer);
+ buffer.writeByte(0x0);
+ buffer.writeInt(offset);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java b/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java
new file mode 100644
index 0000000..6e63c71
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/disk/PerSSTableIndexWriter.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.disk;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.*;
+
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.Unfiltered;
+import org.apache.cassandra.index.sasi.analyzer.AbstractAnalyzer;
+import org.apache.cassandra.index.sasi.conf.ColumnIndex;
+import org.apache.cassandra.index.sasi.utils.CombinedTermIterator;
+import org.apache.cassandra.index.sasi.utils.TypeUtil;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.SSTableFlushObserver;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.Uninterruptibles;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PerSSTableIndexWriter implements SSTableFlushObserver
+{
+ private static final Logger logger = LoggerFactory.getLogger(PerSSTableIndexWriter.class);
+
+ private static final ThreadPoolExecutor INDEX_FLUSHER_MEMTABLE;
+ private static final ThreadPoolExecutor INDEX_FLUSHER_GENERAL;
+
+ static
+ {
+ INDEX_FLUSHER_GENERAL = new JMXEnabledThreadPoolExecutor(1, 8, 60, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(),
+ new NamedThreadFactory("SASI-General"),
+ "internal");
+ INDEX_FLUSHER_GENERAL.allowCoreThreadTimeOut(true);
+
+ INDEX_FLUSHER_MEMTABLE = new JMXEnabledThreadPoolExecutor(1, 8, 60, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(),
+ new NamedThreadFactory("SASI-Memtable"),
+ "internal");
+ INDEX_FLUSHER_MEMTABLE.allowCoreThreadTimeOut(true);
+ }
+
+ private final int nowInSec = FBUtilities.nowInSeconds();
+
+ private final Descriptor descriptor;
+ private final OperationType source;
+
+ private final AbstractType<?> keyValidator;
+ private final Map<ColumnDefinition, ColumnIndex> supportedIndexes;
+
+ @VisibleForTesting
+ protected final Map<ColumnDefinition, Index> indexes;
+
+ private DecoratedKey currentKey;
+ private long currentKeyPosition;
+ private boolean isComplete;
+
+ public PerSSTableIndexWriter(AbstractType<?> keyValidator,
+ Descriptor descriptor,
+ OperationType source,
+ Map<ColumnDefinition, ColumnIndex> supportedIndexes)
+ {
+ this.keyValidator = keyValidator;
+ this.descriptor = descriptor;
+ this.source = source;
+ this.supportedIndexes = supportedIndexes;
+ this.indexes = new HashMap<>();
+ }
+
+ public void begin()
+ {}
+
+ public void startPartition(DecoratedKey key, long curPosition)
+ {
+ currentKey = key;
+ currentKeyPosition = curPosition;
+ }
+
+ public void nextUnfilteredCluster(Unfiltered unfiltered)
+ {
+ if (!unfiltered.isRow())
+ return;
+
+ Row row = (Row) unfiltered;
+
+ supportedIndexes.keySet().forEach((column) -> {
+ ByteBuffer value = ColumnIndex.getValueOf(column, row, nowInSec);
+ if (value == null)
+ return;
+
+ ColumnIndex columnIndex = supportedIndexes.get(column);
+ if (columnIndex == null)
+ return;
+
+ Index index = indexes.get(column);
+ if (index == null)
+ indexes.put(column, (index = new Index(columnIndex)));
+
+ index.add(value.duplicate(), currentKey, currentKeyPosition);
+ });
+ }
+
+ public void complete()
+ {
+ if (isComplete)
+ return;
+
+ currentKey = null;
+
+ try
+ {
+ CountDownLatch latch = new CountDownLatch(indexes.size());
+ for (Index index : indexes.values())
+ index.complete(latch);
+
+ Uninterruptibles.awaitUninterruptibly(latch);
+ }
+ finally
+ {
+ indexes.clear();
+ isComplete = true;
+ }
+ }
+
+ public Index getIndex(ColumnDefinition columnDef)
+ {
+ return indexes.get(columnDef);
+ }
+
+ public Descriptor getDescriptor()
+ {
+ return descriptor;
+ }
+
+ @VisibleForTesting
+ protected class Index
+ {
+ private final ColumnIndex columnIndex;
+ private final String outputFile;
+ private final AbstractAnalyzer analyzer;
+ private final long maxMemorySize;
+
+ @VisibleForTesting
+ protected final Set<Future<OnDiskIndex>> segments;
+ private int segmentNumber = 0;
+
+ private OnDiskIndexBuilder currentBuilder;
+
+ public Index(ColumnIndex columnIndex)
+ {
+ this.columnIndex = columnIndex;
+ this.outputFile = descriptor.filenameFor(columnIndex.getComponent());
+ this.analyzer = columnIndex.getAnalyzer();
+ this.segments = new HashSet<>();
+ this.maxMemorySize = maxMemorySize(columnIndex);
+ this.currentBuilder = newIndexBuilder();
+ }
+
+ public void add(ByteBuffer term, DecoratedKey key, long keyPosition)
+ {
+ if (term.remaining() == 0)
+ return;
+
+ boolean isAdded = false;
+
+ analyzer.reset(term);
+ while (analyzer.hasNext())
+ {
+ ByteBuffer token = analyzer.next();
+ int size = token.remaining();
+
+ if (token.remaining() >= OnDiskIndexBuilder.MAX_TERM_SIZE)
+ {
+ logger.info("Rejecting value (size {}, maximum {} bytes) for column {} (analyzed {}) at {} SSTable.",
+ term.remaining(),
+ OnDiskIndexBuilder.MAX_TERM_SIZE,
+ columnIndex.getColumnName(),
+ columnIndex.getMode().isAnalyzed,
+ descriptor);
+ continue;
+ }
+
+ if (!TypeUtil.isValid(token, columnIndex.getValidator()))
+ {
+ if ((token = TypeUtil.tryUpcast(token, columnIndex.getValidator())) == null)
+ {
+ logger.info("({}) Failed to add {} to index for key: {}, value size was {} bytes, validator is {}.",
+ outputFile,
+ columnIndex.getColumnName(),
+ keyValidator.getString(key.getKey()),
+ size,
+ columnIndex.getValidator());
+ continue;
+ }
+ }
+
+ currentBuilder.add(token, key, keyPosition);
+ isAdded = true;
+ }
+
+ if (!isAdded || currentBuilder.estimatedMemoryUse() < maxMemorySize)
+ return; // non of the generated tokens were added to the index or memory size wasn't reached
+
+ segments.add(getExecutor().submit(scheduleSegmentFlush(false)));
+ }
+
+ @VisibleForTesting
+ protected Callable<OnDiskIndex> scheduleSegmentFlush(final boolean isFinal)
+ {
+ final OnDiskIndexBuilder builder = currentBuilder;
+ currentBuilder = newIndexBuilder();
+
+ final String segmentFile = filename(isFinal);
+
+ return () -> {
+ long start1 = System.nanoTime();
+
+ try
+ {
+ File index = new File(segmentFile);
+ return builder.finish(index) ? new OnDiskIndex(index, columnIndex.getValidator(), null) : null;
+ }
+ finally
+ {
+ if (!isFinal)
+ logger.info("Flushed index segment {}, took {} ms.", segmentFile, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start1));
+ }
+ };
+ }
+
+ public void complete(final CountDownLatch latch)
+ {
+ logger.info("Scheduling index flush to {}", outputFile);
+
+ getExecutor().submit((Runnable) () -> {
+ long start1 = System.nanoTime();
+
+ OnDiskIndex[] parts = new OnDiskIndex[segments.size() + 1];
+
+ try
+ {
+ // no parts present, build entire index from memory
+ if (segments.isEmpty())
+ {
+ scheduleSegmentFlush(true).call();
+ return;
+ }
+
+ // parts are present but there is something still in memory, let's flush that inline
+ if (!currentBuilder.isEmpty())
+ {
+ OnDiskIndex last = scheduleSegmentFlush(false).call();
+ segments.add(Futures.immediateFuture(last));
+ }
+
+ int index = 0;
+ ByteBuffer combinedMin = null, combinedMax = null;
+
+ for (Future<OnDiskIndex> f : segments)
+ {
+ OnDiskIndex part = Futures.getUnchecked(f);
+ if (part == null)
+ continue;
+
+ parts[index++] = part;
+ combinedMin = (combinedMin == null || keyValidator.compare(combinedMin, part.minKey()) > 0) ? part.minKey() : combinedMin;
+ combinedMax = (combinedMax == null || keyValidator.compare(combinedMax, part.maxKey()) < 0) ? part.maxKey() : combinedMax;
+ }
+
+ OnDiskIndexBuilder builder = newIndexBuilder();
+ builder.finish(Pair.create(combinedMin, combinedMax),
+ new File(outputFile),
+ new CombinedTermIterator(parts));
+ }
+ catch (Exception e)
+ {
+ logger.error("Failed to flush index {}.", outputFile, e);
+ FileUtils.delete(outputFile);
+ }
+ finally
+ {
+ logger.info("Index flush to {} took {} ms.", outputFile, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start1));
+
+ for (OnDiskIndex part : parts)
+ {
+ if (part == null)
+ continue;
+
+ FileUtils.closeQuietly(part);
+ FileUtils.delete(part.getIndexPath());
+ }
+
+ latch.countDown();
+ }
+ });
+ }
+
+ private ExecutorService getExecutor()
+ {
+ return source == OperationType.FLUSH ? INDEX_FLUSHER_MEMTABLE : INDEX_FLUSHER_GENERAL;
+ }
+
+ private OnDiskIndexBuilder newIndexBuilder()
+ {
+ return new OnDiskIndexBuilder(keyValidator, columnIndex.getValidator(), columnIndex.getMode().mode);
+ }
+
+ public String filename(boolean isFinal)
+ {
+ return outputFile + (isFinal ? "" : "_" + segmentNumber++);
+ }
+ }
+
+ protected long maxMemorySize(ColumnIndex columnIndex)
+ {
+ // 1G for memtable and configuration for compaction
+ return source == OperationType.FLUSH ? 1073741824L : columnIndex.getMode().maxCompactionFlushMemoryInMb;
+ }
+
+ public int hashCode()
+ {
+ return descriptor.hashCode();
+ }
+
+ public boolean equals(Object o)
+ {
+ return !(o == null || !(o instanceof PerSSTableIndexWriter)) && descriptor.equals(((PerSSTableIndexWriter) o).descriptor);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/disk/Token.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/Token.java b/src/java/org/apache/cassandra/index/sasi/disk/Token.java
new file mode 100644
index 0000000..02130a3
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/disk/Token.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.disk;
+
+import com.google.common.primitives.Longs;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.index.sasi.utils.CombinedValue;
+
+public abstract class Token implements CombinedValue<Long>, Iterable<DecoratedKey>
+{
+ protected final long token;
+
+ public Token(long token)
+ {
+ this.token = token;
+ }
+
+ public Long get()
+ {
+ return token;
+ }
+
+ public int compareTo(CombinedValue<Long> o)
+ {
+ return Longs.compare(token, ((Token) o).token);
+ }
+}