You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by xe...@apache.org on 2016/01/24 04:36:12 UTC
[10/14] cassandra git commit: Integrate SASI index into Cassandra
http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java b/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java
new file mode 100644
index 0000000..5d85d00
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/disk/TokenTree.java
@@ -0,0 +1,519 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.disk;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.index.sasi.utils.AbstractIterator;
+import org.apache.cassandra.index.sasi.utils.CombinedValue;
+import org.apache.cassandra.index.sasi.utils.MappedBuffer;
+import org.apache.cassandra.index.sasi.utils.RangeIterator;
+import org.apache.cassandra.utils.MergeIterator;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+import static org.apache.cassandra.index.sasi.disk.TokenTreeBuilder.EntryType;
+
+// Note: all of the seek-able offsets contained in TokenTree should be sizeof(long)
+// even if currently only lower int portion of them if used, because that makes
+// it possible to switch to mmap implementation which supports long positions
+// without any on-disk format changes and/or re-indexing if one day we'll have a need to.
+public class TokenTree
+{
+ private static final int LONG_BYTES = Long.SIZE / 8;
+ private static final int SHORT_BYTES = Short.SIZE / 8;
+
+ private final Descriptor descriptor;
+ private final MappedBuffer file;
+ private final long startPos;
+ private final long treeMinToken;
+ private final long treeMaxToken;
+ private final long tokenCount;
+
+ @VisibleForTesting
+ protected TokenTree(MappedBuffer tokenTree)
+ {
+ this(Descriptor.CURRENT, tokenTree);
+ }
+
+ public TokenTree(Descriptor d, MappedBuffer tokenTree)
+ {
+ descriptor = d;
+ file = tokenTree;
+ startPos = file.position();
+
+ file.position(startPos + TokenTreeBuilder.SHARED_HEADER_BYTES);
+
+ if (!validateMagic())
+ throw new IllegalArgumentException("invalid token tree");
+
+ tokenCount = file.getLong();
+ treeMinToken = file.getLong();
+ treeMaxToken = file.getLong();
+ }
+
+ public long getCount()
+ {
+ return tokenCount;
+ }
+
+ public RangeIterator<Long, Token> iterator(Function<Long, DecoratedKey> keyFetcher)
+ {
+ return new TokenTreeIterator(file.duplicate(), keyFetcher);
+ }
+
+ public OnDiskToken get(final long searchToken, Function<Long, DecoratedKey> keyFetcher)
+ {
+ seekToLeaf(searchToken, file);
+ long leafStart = file.position();
+ short leafSize = file.getShort(leafStart + 1); // skip the info byte
+
+ file.position(leafStart + TokenTreeBuilder.BLOCK_HEADER_BYTES); // skip to tokens
+ short tokenIndex = searchLeaf(searchToken, leafSize);
+
+ file.position(leafStart + TokenTreeBuilder.BLOCK_HEADER_BYTES);
+
+ OnDiskToken token = OnDiskToken.getTokenAt(file, tokenIndex, leafSize, keyFetcher);
+ return token.get().equals(searchToken) ? token : null;
+ }
+
+ private boolean validateMagic()
+ {
+ switch (descriptor.version.toString())
+ {
+ case Descriptor.VERSION_AA:
+ return true;
+ case Descriptor.VERSION_AB:
+ return TokenTreeBuilder.AB_MAGIC == file.getShort();
+ default:
+ return false;
+ }
+ }
+
+ // finds leaf that *could* contain token
+ private void seekToLeaf(long token, MappedBuffer file)
+ {
+ // this loop always seeks forward except for the first iteration
+ // where it may seek back to the root
+ long blockStart = startPos;
+ while (true)
+ {
+ file.position(blockStart);
+
+ byte info = file.get();
+ boolean isLeaf = (info & 1) == 1;
+
+ if (isLeaf)
+ {
+ file.position(blockStart);
+ break;
+ }
+
+ short tokenCount = file.getShort();
+
+ long minToken = file.getLong();
+ long maxToken = file.getLong();
+
+ long seekBase = blockStart + TokenTreeBuilder.BLOCK_HEADER_BYTES;
+ if (minToken > token)
+ {
+ // seek to beginning of child offsets to locate first child
+ file.position(seekBase + tokenCount * LONG_BYTES);
+ blockStart = (startPos + (int) file.getLong());
+ }
+ else if (maxToken < token)
+ {
+ // seek to end of child offsets to locate last child
+ file.position(seekBase + (2 * tokenCount) * LONG_BYTES);
+ blockStart = (startPos + (int) file.getLong());
+ }
+ else
+ {
+ // skip to end of block header/start of interior block tokens
+ file.position(seekBase);
+
+ short offsetIndex = searchBlock(token, tokenCount, file);
+
+ // file pointer is now at beginning of offsets
+ if (offsetIndex == tokenCount)
+ file.position(file.position() + (offsetIndex * LONG_BYTES));
+ else
+ file.position(file.position() + ((tokenCount - offsetIndex - 1) + offsetIndex) * LONG_BYTES);
+
+ blockStart = (startPos + (int) file.getLong());
+ }
+ }
+ }
+
+ private short searchBlock(long searchToken, short tokenCount, MappedBuffer file)
+ {
+ short offsetIndex = 0;
+ for (int i = 0; i < tokenCount; i++)
+ {
+ long readToken = file.getLong();
+ if (searchToken < readToken)
+ break;
+
+ offsetIndex++;
+ }
+
+ return offsetIndex;
+ }
+
+ private short searchLeaf(long searchToken, short tokenCount)
+ {
+ long base = file.position();
+
+ int start = 0;
+ int end = tokenCount;
+ int middle = 0;
+
+ while (start <= end)
+ {
+ middle = start + ((end - start) >> 1);
+
+ // each entry is 16 bytes wide, token is in bytes 4-11
+ long token = file.getLong(base + (middle * (2 * LONG_BYTES) + 4));
+
+ if (token == searchToken)
+ break;
+
+ if (token < searchToken)
+ start = middle + 1;
+ else
+ end = middle - 1;
+ }
+
+ return (short) middle;
+ }
+
+ public class TokenTreeIterator extends RangeIterator<Long, Token>
+ {
+ private final Function<Long, DecoratedKey> keyFetcher;
+ private final MappedBuffer file;
+
+ private long currentLeafStart;
+ private int currentTokenIndex;
+
+ private long leafMinToken;
+ private long leafMaxToken;
+ private short leafSize;
+
+ protected boolean firstIteration = true;
+ private boolean lastLeaf;
+
+ TokenTreeIterator(MappedBuffer file, Function<Long, DecoratedKey> keyFetcher)
+ {
+ super(treeMinToken, treeMaxToken, tokenCount);
+
+ this.file = file;
+ this.keyFetcher = keyFetcher;
+ }
+
+ protected Token computeNext()
+ {
+ maybeFirstIteration();
+
+ if (currentTokenIndex >= leafSize && lastLeaf)
+ return endOfData();
+
+ if (currentTokenIndex < leafSize) // tokens remaining in this leaf
+ {
+ return getTokenAt(currentTokenIndex++);
+ }
+ else // no more tokens remaining in this leaf
+ {
+ assert !lastLeaf;
+
+ seekToNextLeaf();
+ setupBlock();
+ return computeNext();
+ }
+ }
+
+ protected void performSkipTo(Long nextToken)
+ {
+ maybeFirstIteration();
+
+ if (nextToken <= leafMaxToken) // next is in this leaf block
+ {
+ searchLeaf(nextToken);
+ }
+ else // next is in a leaf block that needs to be found
+ {
+ seekToLeaf(nextToken, file);
+ setupBlock();
+ findNearest(nextToken);
+ }
+ }
+
+ private void setupBlock()
+ {
+ currentLeafStart = file.position();
+ currentTokenIndex = 0;
+
+ lastLeaf = (file.get() & (1 << TokenTreeBuilder.LAST_LEAF_SHIFT)) > 0;
+ leafSize = file.getShort();
+
+ leafMinToken = file.getLong();
+ leafMaxToken = file.getLong();
+
+ // seek to end of leaf header/start of data
+ file.position(currentLeafStart + TokenTreeBuilder.BLOCK_HEADER_BYTES);
+ }
+
+ private void findNearest(Long next)
+ {
+ if (next > leafMaxToken && !lastLeaf)
+ {
+ seekToNextLeaf();
+ setupBlock();
+ findNearest(next);
+ }
+ else if (next > leafMinToken)
+ searchLeaf(next);
+ }
+
+ private void searchLeaf(long next)
+ {
+ for (int i = currentTokenIndex; i < leafSize; i++)
+ {
+ if (compareTokenAt(currentTokenIndex, next) >= 0)
+ break;
+
+ currentTokenIndex++;
+ }
+ }
+
+ private int compareTokenAt(int idx, long toToken)
+ {
+ return Long.compare(file.getLong(getTokenPosition(idx)), toToken);
+ }
+
+ private Token getTokenAt(int idx)
+ {
+ return OnDiskToken.getTokenAt(file, idx, leafSize, keyFetcher);
+ }
+
+ private long getTokenPosition(int idx)
+ {
+ // skip 4 byte entry header to get position pointing directly at the entry's token
+ return OnDiskToken.getEntryPosition(idx, file) + (2 * SHORT_BYTES);
+ }
+
+ private void seekToNextLeaf()
+ {
+ file.position(currentLeafStart + TokenTreeBuilder.BLOCK_BYTES);
+ }
+
+ public void close() throws IOException
+ {
+ // nothing to do here
+ }
+
+ private void maybeFirstIteration()
+ {
+ // seek to the first token only when requested for the first time,
+ // highly predictable branch and saves us a lot by not traversing the tree
+ // on creation time because it's not at all required.
+ if (!firstIteration)
+ return;
+
+ seekToLeaf(treeMinToken, file);
+ setupBlock();
+ firstIteration = false;
+ }
+ }
+
+ public static class OnDiskToken extends Token
+ {
+ private final Set<TokenInfo> info = new HashSet<>(2);
+ private final Set<DecoratedKey> loadedKeys = new TreeSet<>(DecoratedKey.comparator);
+
+ public OnDiskToken(MappedBuffer buffer, long position, short leafSize, Function<Long, DecoratedKey> keyFetcher)
+ {
+ super(buffer.getLong(position + (2 * SHORT_BYTES)));
+ info.add(new TokenInfo(buffer, position, leafSize, keyFetcher));
+ }
+
+ public void merge(CombinedValue<Long> other)
+ {
+ if (!(other instanceof Token))
+ return;
+
+ Token o = (Token) other;
+ if (token != o.token)
+ throw new IllegalArgumentException(String.format("%s != %s", token, o.token));
+
+ if (o instanceof OnDiskToken)
+ {
+ info.addAll(((OnDiskToken) other).info);
+ }
+ else
+ {
+ Iterators.addAll(loadedKeys, o.iterator());
+ }
+ }
+
+ public Iterator<DecoratedKey> iterator()
+ {
+ List<Iterator<DecoratedKey>> keys = new ArrayList<>(info.size());
+
+ for (TokenInfo i : info)
+ keys.add(i.iterator());
+
+ if (!loadedKeys.isEmpty())
+ keys.add(loadedKeys.iterator());
+
+ return MergeIterator.get(keys, DecoratedKey.comparator, new MergeIterator.Reducer<DecoratedKey, DecoratedKey>()
+ {
+ DecoratedKey reduced = null;
+
+ public boolean trivialReduceIsTrivial()
+ {
+ return true;
+ }
+
+ public void reduce(int idx, DecoratedKey current)
+ {
+ reduced = current;
+ }
+
+ protected DecoratedKey getReduced()
+ {
+ return reduced;
+ }
+ });
+ }
+
+ public Set<Long> getOffsets()
+ {
+ Set<Long> offsets = new HashSet<>();
+ for (TokenInfo i : info)
+ {
+ for (long offset : i.fetchOffsets())
+ offsets.add(offset);
+ }
+
+ return offsets;
+ }
+
+ public static OnDiskToken getTokenAt(MappedBuffer buffer, int idx, short leafSize, Function<Long, DecoratedKey> keyFetcher)
+ {
+ return new OnDiskToken(buffer, getEntryPosition(idx, buffer), leafSize, keyFetcher);
+ }
+
+ private static long getEntryPosition(int idx, MappedBuffer file)
+ {
+ // info (4 bytes) + token (8 bytes) + offset (4 bytes) = 16 bytes
+ return file.position() + (idx * (2 * LONG_BYTES));
+ }
+ }
+
+ private static class TokenInfo
+ {
+ private final MappedBuffer buffer;
+ private final Function<Long, DecoratedKey> keyFetcher;
+
+ private final long position;
+ private final short leafSize;
+
+ public TokenInfo(MappedBuffer buffer, long position, short leafSize, Function<Long, DecoratedKey> keyFetcher)
+ {
+ this.keyFetcher = keyFetcher;
+ this.buffer = buffer;
+ this.position = position;
+ this.leafSize = leafSize;
+ }
+
+ public Iterator<DecoratedKey> iterator()
+ {
+ return new KeyIterator(keyFetcher, fetchOffsets());
+ }
+
+ public int hashCode()
+ {
+ return new HashCodeBuilder().append(keyFetcher).append(position).append(leafSize).build();
+ }
+
+ public boolean equals(Object other)
+ {
+ if (!(other instanceof TokenInfo))
+ return false;
+
+ TokenInfo o = (TokenInfo) other;
+ return keyFetcher == o.keyFetcher && position == o.position;
+ }
+
+ private long[] fetchOffsets()
+ {
+ short info = buffer.getShort(position);
+ short offsetShort = buffer.getShort(position + SHORT_BYTES);
+ int offsetInt = buffer.getInt(position + (2 * SHORT_BYTES) + LONG_BYTES);
+
+ EntryType type = EntryType.of(info & TokenTreeBuilder.ENTRY_TYPE_MASK);
+
+ switch (type)
+ {
+ case SIMPLE:
+ return new long[] { offsetInt };
+
+ case OVERFLOW:
+ long[] offsets = new long[offsetShort]; // offsetShort contains count of tokens
+ long offsetPos = (buffer.position() + (2 * (leafSize * LONG_BYTES)) + (offsetInt * LONG_BYTES));
+
+ for (int i = 0; i < offsetShort; i++)
+ offsets[i] = buffer.getLong(offsetPos + (i * LONG_BYTES));
+
+ return offsets;
+
+ case FACTORED:
+ return new long[] { (((long) offsetInt) << Short.SIZE) + offsetShort };
+
+ case PACKED:
+ return new long[] { offsetShort, offsetInt };
+
+ default:
+ throw new IllegalStateException("Unknown entry type: " + type);
+ }
+ }
+ }
+
+ private static class KeyIterator extends AbstractIterator<DecoratedKey>
+ {
+ private final Function<Long, DecoratedKey> keyFetcher;
+ private final long[] offsets;
+ private int index = 0;
+
+ public KeyIterator(Function<Long, DecoratedKey> keyFetcher, long[] offsets)
+ {
+ this.keyFetcher = keyFetcher;
+ this.offsets = offsets;
+ }
+
+ public DecoratedKey computeNext()
+ {
+ return index < offsets.length ? keyFetcher.apply(offsets[index++]) : endOfData();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/disk/TokenTreeBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/disk/TokenTreeBuilder.java b/src/java/org/apache/cassandra/index/sasi/disk/TokenTreeBuilder.java
new file mode 100644
index 0000000..e10b057
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/disk/TokenTreeBuilder.java
@@ -0,0 +1,839 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.disk;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+
+import com.carrotsearch.hppc.LongArrayList;
+import com.carrotsearch.hppc.LongSet;
+import com.carrotsearch.hppc.cursors.LongCursor;
+import com.carrotsearch.hppc.LongOpenHashSet;
+import com.google.common.collect.AbstractIterator;
+
+public class TokenTreeBuilder
+{
+ // note: ordinal positions are used here, do not change order
+ enum EntryType
+ {
+ SIMPLE, FACTORED, PACKED, OVERFLOW;
+
+ public static EntryType of(int ordinal)
+ {
+ if (ordinal == SIMPLE.ordinal())
+ return SIMPLE;
+
+ if (ordinal == FACTORED.ordinal())
+ return FACTORED;
+
+ if (ordinal == PACKED.ordinal())
+ return PACKED;
+
+ if (ordinal == OVERFLOW.ordinal())
+ return OVERFLOW;
+
+ throw new IllegalArgumentException("Unknown ordinal: " + ordinal);
+ }
+ }
+
+ public static final int BLOCK_BYTES = 4096;
+ public static final int BLOCK_HEADER_BYTES = 64;
+ public static final int OVERFLOW_TRAILER_BYTES = 64;
+ public static final int OVERFLOW_TRAILER_CAPACITY = OVERFLOW_TRAILER_BYTES / 8;
+ public static final int TOKENS_PER_BLOCK = (BLOCK_BYTES - BLOCK_HEADER_BYTES - OVERFLOW_TRAILER_BYTES) / 16;
+ public static final long MAX_OFFSET = (1L << 47) - 1; // 48 bits for (signed) offset
+ public static final byte LAST_LEAF_SHIFT = 1;
+ public static final byte SHARED_HEADER_BYTES = 19;
+ public static final byte ENTRY_TYPE_MASK = 0x03;
+ public static final short AB_MAGIC = 0x5A51;
+
+ private final SortedMap<Long, LongSet> tokens = new TreeMap<>();
+ private int numBlocks;
+
+ private Node root;
+ private InteriorNode rightmostParent;
+ private Leaf leftmostLeaf;
+ private Leaf rightmostLeaf;
+ private long tokenCount = 0;
+ private long treeMinToken;
+ private long treeMaxToken;
+
+ public TokenTreeBuilder()
+ {}
+
+ public TokenTreeBuilder(SortedMap<Long, LongSet> data)
+ {
+ add(data);
+ }
+
+ public void add(Long token, long keyPosition)
+ {
+ LongSet found = tokens.get(token);
+ if (found == null)
+ tokens.put(token, (found = new LongOpenHashSet(2)));
+
+ found.add(keyPosition);
+ }
+
+ public void add(SortedMap<Long, LongSet> data)
+ {
+ for (Map.Entry<Long, LongSet> newEntry : data.entrySet())
+ {
+ LongSet found = tokens.get(newEntry.getKey());
+ if (found == null)
+ tokens.put(newEntry.getKey(), (found = new LongOpenHashSet(4)));
+
+ for (LongCursor offset : newEntry.getValue())
+ found.add(offset.value);
+ }
+ }
+
+ public TokenTreeBuilder finish()
+ {
+ maybeBulkLoad();
+ return this;
+ }
+
+ public SortedMap<Long, LongSet> getTokens()
+ {
+ return tokens;
+ }
+
+ public long getTokenCount()
+ {
+ return tokenCount;
+ }
+
+ public int serializedSize()
+ {
+ if (numBlocks == 1)
+ return (BLOCK_HEADER_BYTES + ((int) tokenCount * 16));
+ else
+ return numBlocks * BLOCK_BYTES;
+ }
+
+ public void write(DataOutputPlus out) throws IOException
+ {
+ ByteBuffer blockBuffer = ByteBuffer.allocate(BLOCK_BYTES);
+ Iterator<Node> levelIterator = root.levelIterator();
+ long childBlockIndex = 1;
+
+ while (levelIterator != null)
+ {
+
+ Node firstChild = null;
+ while (levelIterator.hasNext())
+ {
+ Node block = levelIterator.next();
+
+ if (firstChild == null && !block.isLeaf())
+ firstChild = ((InteriorNode) block).children.get(0);
+
+ block.serialize(childBlockIndex, blockBuffer);
+ flushBuffer(blockBuffer, out, numBlocks != 1);
+
+ childBlockIndex += block.childCount();
+ }
+
+ levelIterator = (firstChild == null) ? null : firstChild.levelIterator();
+ }
+ }
+
+ public Iterator<Pair<Long, LongSet>> iterator()
+ {
+ return new TokenIterator(leftmostLeaf.levelIterator());
+ }
+
+ private void maybeBulkLoad()
+ {
+ if (root == null)
+ bulkLoad();
+ }
+
+ private void flushBuffer(ByteBuffer buffer, DataOutputPlus o, boolean align) throws IOException
+ {
+ // seek to end of last block before flushing
+ if (align)
+ alignBuffer(buffer, BLOCK_BYTES);
+
+ buffer.flip();
+ o.write(buffer);
+ buffer.clear();
+ }
+
+ private static void alignBuffer(ByteBuffer buffer, int blockSize)
+ {
+ long curPos = buffer.position();
+ if ((curPos & (blockSize - 1)) != 0) // align on the block boundary if needed
+ buffer.position((int) FBUtilities.align(curPos, blockSize));
+ }
+
+ private void bulkLoad()
+ {
+ tokenCount = tokens.size();
+ treeMinToken = tokens.firstKey();
+ treeMaxToken = tokens.lastKey();
+ numBlocks = 1;
+
+ // special case the tree that only has a single block in it (so we don't create a useless root)
+ if (tokenCount <= TOKENS_PER_BLOCK)
+ {
+ leftmostLeaf = new Leaf(tokens);
+ rightmostLeaf = leftmostLeaf;
+ root = leftmostLeaf;
+ }
+ else
+ {
+ root = new InteriorNode();
+ rightmostParent = (InteriorNode) root;
+
+ int i = 0;
+ Leaf lastLeaf = null;
+ Long firstToken = tokens.firstKey();
+ Long finalToken = tokens.lastKey();
+ Long lastToken;
+ for (Long token : tokens.keySet())
+ {
+ if (i == 0 || (i % TOKENS_PER_BLOCK != 0 && i != (tokenCount - 1)))
+ {
+ i++;
+ continue;
+ }
+
+ lastToken = token;
+ Leaf leaf = (i != (tokenCount - 1) || token.equals(finalToken)) ?
+ new Leaf(tokens.subMap(firstToken, lastToken)) : new Leaf(tokens.tailMap(firstToken));
+
+ if (i == TOKENS_PER_BLOCK)
+ leftmostLeaf = leaf;
+ else
+ lastLeaf.next = leaf;
+
+ rightmostParent.add(leaf);
+ lastLeaf = leaf;
+ rightmostLeaf = leaf;
+ firstToken = lastToken;
+ i++;
+ numBlocks++;
+
+ if (token.equals(finalToken))
+ {
+ Leaf finalLeaf = new Leaf(tokens.tailMap(token));
+ lastLeaf.next = finalLeaf;
+ rightmostParent.add(finalLeaf);
+ rightmostLeaf = finalLeaf;
+ numBlocks++;
+ }
+ }
+
+ }
+ }
+
+ private abstract class Node
+ {
+ protected InteriorNode parent;
+ protected Node next;
+ protected Long nodeMinToken, nodeMaxToken;
+
+ public abstract void serialize(long childBlockIndex, ByteBuffer buf);
+ public abstract int childCount();
+ public abstract int tokenCount();
+ public abstract Long smallestToken();
+
+ public Iterator<Node> levelIterator()
+ {
+ return new LevelIterator(this);
+ }
+
+ public boolean isLeaf()
+ {
+ return (this instanceof Leaf);
+ }
+
+ protected boolean isLastLeaf()
+ {
+ return this == rightmostLeaf;
+ }
+
+ protected boolean isRoot()
+ {
+ return this == root;
+ }
+
+ protected void updateTokenRange(long token)
+ {
+ nodeMinToken = nodeMinToken == null ? token : Math.min(nodeMinToken, token);
+ nodeMaxToken = nodeMaxToken == null ? token : Math.max(nodeMaxToken, token);
+ }
+
+ protected void serializeHeader(ByteBuffer buf)
+ {
+ Header header;
+ if (isRoot())
+ header = new RootHeader();
+ else if (!isLeaf())
+ header = new InteriorNodeHeader();
+ else
+ header = new LeafHeader();
+
+ header.serialize(buf);
+ alignBuffer(buf, BLOCK_HEADER_BYTES);
+ }
+
+ private abstract class Header
+ {
+ public void serialize(ByteBuffer buf)
+ {
+ buf.put(infoByte())
+ .putShort((short) (tokenCount()))
+ .putLong(nodeMinToken)
+ .putLong(nodeMaxToken);
+ }
+
+ protected abstract byte infoByte();
+ }
+
+ private class RootHeader extends Header
+ {
+ public void serialize(ByteBuffer buf)
+ {
+ super.serialize(buf);
+ writeMagic(buf);
+ buf.putLong(tokenCount)
+ .putLong(treeMinToken)
+ .putLong(treeMaxToken);
+ }
+
+ protected byte infoByte()
+ {
+ // if leaf, set leaf indicator and last leaf indicator (bits 0 & 1)
+ // if not leaf, clear both bits
+ return (byte) ((isLeaf()) ? 3 : 0);
+ }
+
+ protected void writeMagic(ByteBuffer buf)
+ {
+ switch (Descriptor.CURRENT_VERSION)
+ {
+ case Descriptor.VERSION_AB:
+ buf.putShort(AB_MAGIC);
+ break;
+ default:
+ break;
+ }
+
+ }
+ }
+
+ private class InteriorNodeHeader extends Header
+ {
+ // bit 0 (leaf indicator) & bit 1 (last leaf indicator) cleared
+ protected byte infoByte()
+ {
+ return 0;
+ }
+ }
+
+ private class LeafHeader extends Header
+ {
+ // bit 0 set as leaf indicator
+ // bit 1 set if this is last leaf of data
+ protected byte infoByte()
+ {
+ byte infoByte = 1;
+ infoByte |= (isLastLeaf()) ? (1 << LAST_LEAF_SHIFT) : 0;
+
+ return infoByte;
+ }
+ }
+
+ }
+
+ private class Leaf extends Node
+ {
+ private final SortedMap<Long, LongSet> tokens;
+ private LongArrayList overflowCollisions;
+
+ Leaf(SortedMap<Long, LongSet> data)
+ {
+ nodeMinToken = data.firstKey();
+ nodeMaxToken = data.lastKey();
+ tokens = data;
+ }
+
+ public Long largestToken()
+ {
+ return nodeMaxToken;
+ }
+
+ public void serialize(long childBlockIndex, ByteBuffer buf)
+ {
+ serializeHeader(buf);
+ serializeData(buf);
+ serializeOverflowCollisions(buf);
+ }
+
+ public int childCount()
+ {
+ return 0;
+ }
+
+ public int tokenCount()
+ {
+ return tokens.size();
+ }
+
+ public Long smallestToken()
+ {
+ return nodeMinToken;
+ }
+
+ public Iterator<Map.Entry<Long, LongSet>> tokenIterator()
+ {
+ return tokens.entrySet().iterator();
+ }
+
+ private void serializeData(ByteBuffer buf)
+ {
+ for (Map.Entry<Long, LongSet> entry : tokens.entrySet())
+ createEntry(entry.getKey(), entry.getValue()).serialize(buf);
+ }
+
+ private void serializeOverflowCollisions(ByteBuffer buf)
+ {
+ if (overflowCollisions != null)
+ for (LongCursor offset : overflowCollisions)
+ buf.putLong(offset.value);
+ }
+
+
+ private LeafEntry createEntry(final long tok, final LongSet offsets)
+ {
+ int offsetCount = offsets.size();
+ switch (offsetCount)
+ {
+ case 0:
+ throw new AssertionError("no offsets for token " + tok);
+ case 1:
+ long offset = offsets.toArray()[0];
+ if (offset > MAX_OFFSET)
+ throw new AssertionError("offset " + offset + " cannot be greater than " + MAX_OFFSET);
+ else if (offset <= Integer.MAX_VALUE)
+ return new SimpleLeafEntry(tok, offset);
+ else
+ return new FactoredOffsetLeafEntry(tok, offset);
+ case 2:
+ long[] rawOffsets = offsets.toArray();
+ if (rawOffsets[0] <= Integer.MAX_VALUE && rawOffsets[1] <= Integer.MAX_VALUE &&
+ (rawOffsets[0] <= Short.MAX_VALUE || rawOffsets[1] <= Short.MAX_VALUE))
+ return new PackedCollisionLeafEntry(tok, rawOffsets);
+ else
+ return createOverflowEntry(tok, offsetCount, offsets);
+ default:
+ return createOverflowEntry(tok, offsetCount, offsets);
+ }
+ }
+
+ private LeafEntry createOverflowEntry(final long tok, final int offsetCount, final LongSet offsets)
+ {
+ if (overflowCollisions == null)
+ overflowCollisions = new LongArrayList();
+
+ LeafEntry entry = new OverflowCollisionLeafEntry(tok, (short) overflowCollisions.size(), (short) offsetCount);
+ for (LongCursor o : offsets) {
+ if (overflowCollisions.size() == OVERFLOW_TRAILER_CAPACITY)
+ throw new AssertionError("cannot have more than " + OVERFLOW_TRAILER_CAPACITY + " overflow collisions per leaf");
+ else
+ overflowCollisions.add(o.value);
+ }
+ return entry;
+ }
+
+ private abstract class LeafEntry
+ {
+ protected final long token;
+
+ abstract public EntryType type();
+ abstract public int offsetData();
+ abstract public short offsetExtra();
+
+ public LeafEntry(final long tok)
+ {
+ token = tok;
+ }
+
+ public void serialize(ByteBuffer buf)
+ {
+ buf.putShort((short) type().ordinal())
+ .putShort(offsetExtra())
+ .putLong(token)
+ .putInt(offsetData());
+ }
+
+ }
+
+
+ // assumes there is a single offset and the offset is <= Integer.MAX_VALUE
+ private class SimpleLeafEntry extends LeafEntry
+ {
+ private final long offset;
+
+ public SimpleLeafEntry(final long tok, final long off)
+ {
+ super(tok);
+ offset = off;
+ }
+
+ public EntryType type()
+ {
+ return EntryType.SIMPLE;
+ }
+
+ public int offsetData()
+ {
+ return (int) offset;
+ }
+
+ public short offsetExtra()
+ {
+ return 0;
+ }
+ }
+
+ // assumes there is a single offset and Integer.MAX_VALUE < offset <= MAX_OFFSET
+ // take the middle 32 bits of offset (or the top 32 when considering offset is max 48 bits)
+ // and store where offset is normally stored. take bottom 16 bits of offset and store in entry header
+ private class FactoredOffsetLeafEntry extends LeafEntry
+ {
+ private final long offset;
+
+ public FactoredOffsetLeafEntry(final long tok, final long off)
+ {
+ super(tok);
+ offset = off;
+ }
+
+ public EntryType type()
+ {
+ return EntryType.FACTORED;
+ }
+
+ public int offsetData()
+ {
+ return (int) (offset >>> Short.SIZE);
+ }
+
+ public short offsetExtra()
+ {
+ return (short) offset;
+ }
+ }
+
+ // holds an entry with two offsets that can be packed in an int & a short
+ // the int offset is stored where offset is normally stored. short offset is
+ // stored in entry header
+ private class PackedCollisionLeafEntry extends LeafEntry
+ {
+ private short smallerOffset;
+ private int largerOffset;
+
+ public PackedCollisionLeafEntry(final long tok, final long[] offs)
+ {
+ super(tok);
+
+ smallerOffset = (short) Math.min(offs[0], offs[1]);
+ largerOffset = (int) Math.max(offs[0], offs[1]);
+ }
+
+ public EntryType type()
+ {
+ return EntryType.PACKED;
+ }
+
+ public int offsetData()
+ {
+ return largerOffset;
+ }
+
+ public short offsetExtra()
+ {
+ return smallerOffset;
+ }
+ }
+
+ // holds an entry with three or more offsets, or two offsets that cannot
+ // be packed into an int & a short. the index into the overflow list
+ // is stored where the offset is normally stored. the number of overflowed offsets
+ // for the entry is stored in the entry header
+ private class OverflowCollisionLeafEntry extends LeafEntry
+ {
+ private final short startIndex;
+ private final short count;
+
+ public OverflowCollisionLeafEntry(final long tok, final short collisionStartIndex, final short collisionCount)
+ {
+ super(tok);
+ startIndex = collisionStartIndex;
+ count = collisionCount;
+ }
+
+ public EntryType type()
+ {
+ return EntryType.OVERFLOW;
+ }
+
+ public int offsetData()
+ {
+ return startIndex;
+ }
+
+ public short offsetExtra()
+ {
+ return count;
+ }
+
+ }
+
+ }
+
+ private class InteriorNode extends Node
+ {
+ private List<Long> tokens = new ArrayList<>(TOKENS_PER_BLOCK);
+ private List<Node> children = new ArrayList<>(TOKENS_PER_BLOCK + 1);
+ private int position = 0; // TODO (jwest): can get rid of this and use array size
+
+
+ public void serialize(long childBlockIndex, ByteBuffer buf)
+ {
+ serializeHeader(buf);
+ serializeTokens(buf);
+ serializeChildOffsets(childBlockIndex, buf);
+ }
+
+ public int childCount()
+ {
+ return children.size();
+ }
+
+ public int tokenCount()
+ {
+ return tokens.size();
+ }
+
+ public Long smallestToken()
+ {
+ return tokens.get(0);
+ }
+
+ protected void add(Long token, InteriorNode leftChild, InteriorNode rightChild)
+ {
+ int pos = tokens.size();
+ if (pos == TOKENS_PER_BLOCK)
+ {
+ InteriorNode sibling = split();
+ sibling.add(token, leftChild, rightChild);
+
+ }
+ else {
+ if (leftChild != null)
+ children.add(pos, leftChild);
+
+ if (rightChild != null)
+ {
+ children.add(pos + 1, rightChild);
+ rightChild.parent = this;
+ }
+
+ updateTokenRange(token);
+ tokens.add(pos, token);
+ }
+ }
+
+ protected void add(Leaf node)
+ {
+
+ if (position == (TOKENS_PER_BLOCK + 1))
+ {
+ rightmostParent = split();
+ rightmostParent.add(node);
+ }
+ else
+ {
+
+ node.parent = this;
+ children.add(position, node);
+ position++;
+
+ // the first child is referenced only during bulk load. we don't take a value
+ // to store into the tree, one is subtracted since position has already been incremented
+ // for the next node to be added
+ if (position - 1 == 0)
+ return;
+
+
+ // tokens are inserted one behind the current position, but 2 is subtracted because
+ // position has already been incremented for the next add
+ Long smallestToken = node.smallestToken();
+ updateTokenRange(smallestToken);
+ tokens.add(position - 2, smallestToken);
+ }
+
+ }
+
+ protected InteriorNode split()
+ {
+ Pair<Long, InteriorNode> splitResult = splitBlock();
+ Long middleValue = splitResult.left;
+ InteriorNode sibling = splitResult.right;
+ InteriorNode leftChild = null;
+
+ // create a new root if necessary
+ if (parent == null)
+ {
+ parent = new InteriorNode();
+ root = parent;
+ sibling.parent = parent;
+ leftChild = this;
+ numBlocks++;
+ }
+
+ parent.add(middleValue, leftChild, sibling);
+
+ return sibling;
+ }
+
+ protected Pair<Long, InteriorNode> splitBlock()
+ {
+ final int splitPosition = TOKENS_PER_BLOCK - 2;
+ InteriorNode sibling = new InteriorNode();
+ sibling.parent = parent;
+ next = sibling;
+
+ Long middleValue = tokens.get(splitPosition);
+
+ for (int i = splitPosition; i < TOKENS_PER_BLOCK; i++)
+ {
+ if (i != TOKENS_PER_BLOCK && i != splitPosition)
+ {
+ long token = tokens.get(i);
+ sibling.updateTokenRange(token);
+ sibling.tokens.add(token);
+ }
+
+ Node child = children.get(i + 1);
+ child.parent = sibling;
+ sibling.children.add(child);
+ sibling.position++;
+ }
+
+ for (int i = TOKENS_PER_BLOCK; i >= splitPosition; i--)
+ {
+ if (i != TOKENS_PER_BLOCK)
+ tokens.remove(i);
+
+ if (i != splitPosition)
+ children.remove(i);
+ }
+
+ nodeMinToken = smallestToken();
+ nodeMaxToken = tokens.get(tokens.size() - 1);
+ numBlocks++;
+
+ return Pair.create(middleValue, sibling);
+ }
+
+ protected boolean isFull()
+ {
+ return (position >= TOKENS_PER_BLOCK + 1);
+ }
+
+ private void serializeTokens(ByteBuffer buf)
+ {
+ for (Long token : tokens)
+ buf.putLong(token);
+ }
+
+
+ private void serializeChildOffsets(long childBlockIndex, ByteBuffer buf)
+ {
+ for (int i = 0; i < children.size(); i++)
+ buf.putLong((childBlockIndex + i) * BLOCK_BYTES);
+ }
+ }
+
+ public static class LevelIterator extends AbstractIterator<Node>
+ {
+ private Node currentNode;
+
+ LevelIterator(Node first)
+ {
+ currentNode = first;
+ }
+
+ public Node computeNext()
+ {
+ if (currentNode == null)
+ return endOfData();
+
+ Node returnNode = currentNode;
+ currentNode = returnNode.next;
+
+ return returnNode;
+ }
+
+
+ }
+
+ public static class TokenIterator extends AbstractIterator<Pair<Long, LongSet>>
+ {
+ private Iterator<Node> levelIterator;
+ private Iterator<Map.Entry<Long, LongSet>> currentIterator;
+
+ TokenIterator(Iterator<Node> level)
+ {
+ levelIterator = level;
+ if (levelIterator.hasNext())
+ currentIterator = ((Leaf) levelIterator.next()).tokenIterator();
+ }
+
+ public Pair<Long, LongSet> computeNext()
+ {
+ if (currentIterator != null && currentIterator.hasNext())
+ {
+ Map.Entry<Long, LongSet> next = currentIterator.next();
+ return Pair.create(next.getKey(), next.getValue());
+ }
+ else
+ {
+ if (!levelIterator.hasNext())
+ return endOfData();
+ else
+ {
+ currentIterator = ((Leaf) levelIterator.next()).tokenIterator();
+ return computeNext();
+ }
+ }
+
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/exceptions/TimeQuotaExceededException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/exceptions/TimeQuotaExceededException.java b/src/java/org/apache/cassandra/index/sasi/exceptions/TimeQuotaExceededException.java
new file mode 100644
index 0000000..af577dc
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/exceptions/TimeQuotaExceededException.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.exceptions;
+
+public class TimeQuotaExceededException extends RuntimeException
+{}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/memory/IndexMemtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/memory/IndexMemtable.java b/src/java/org/apache/cassandra/index/sasi/memory/IndexMemtable.java
new file mode 100644
index 0000000..cf7f3a5
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/memory/IndexMemtable.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.memory;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.index.sasi.conf.ColumnIndex;
+import org.apache.cassandra.index.sasi.disk.Token;
+import org.apache.cassandra.index.sasi.plan.Expression;
+import org.apache.cassandra.index.sasi.utils.RangeIterator;
+import org.apache.cassandra.index.sasi.utils.TypeUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class IndexMemtable
+{
+ private static final Logger logger = LoggerFactory.getLogger(IndexMemtable.class);
+
+ private final MemIndex index;
+
+ public IndexMemtable(ColumnIndex columnIndex)
+ {
+ this.index = MemIndex.forColumn(columnIndex.keyValidator(), columnIndex);
+ }
+
+ public long index(DecoratedKey key, ByteBuffer value)
+ {
+ if (value == null || value.remaining() == 0)
+ return 0;
+
+ AbstractType<?> validator = index.columnIndex.getValidator();
+ if (!TypeUtil.isValid(value, validator))
+ {
+ int size = value.remaining();
+ if ((value = TypeUtil.tryUpcast(value, validator)) == null)
+ {
+ logger.error("Can't add column {} to index for key: {}, value size {} bytes, validator: {}.",
+ index.columnIndex.getColumnName(),
+ index.columnIndex.keyValidator().getString(key.getKey()),
+ size,
+ validator);
+ return 0;
+ }
+ }
+
+ return index.add(key, value);
+ }
+
+ public RangeIterator<Long, Token> search(Expression expression)
+ {
+ return index == null ? null : index.search(expression);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/memory/KeyRangeIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/memory/KeyRangeIterator.java b/src/java/org/apache/cassandra/index/sasi/memory/KeyRangeIterator.java
new file mode 100644
index 0000000..293e2ee
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/memory/KeyRangeIterator.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.memory;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.index.sasi.disk.Token;
+import org.apache.cassandra.index.sasi.utils.AbstractIterator;
+import org.apache.cassandra.index.sasi.utils.CombinedValue;
+import org.apache.cassandra.index.sasi.utils.RangeIterator;
+
+import com.google.common.collect.PeekingIterator;
+
+public class KeyRangeIterator extends RangeIterator<Long, Token>
+{
+ private final DKIterator iterator;
+
+ public KeyRangeIterator(ConcurrentSkipListSet<DecoratedKey> keys)
+ {
+ super((Long) keys.first().getToken().getTokenValue(), (Long) keys.last().getToken().getTokenValue(), keys.size());
+ this.iterator = new DKIterator(keys.iterator());
+ }
+
+ protected Token computeNext()
+ {
+ return iterator.hasNext() ? new DKToken(iterator.next()) : endOfData();
+ }
+
+ protected void performSkipTo(Long nextToken)
+ {
+ while (iterator.hasNext())
+ {
+ DecoratedKey key = iterator.peek();
+ if (Long.compare((long) key.getToken().getTokenValue(), nextToken) >= 0)
+ break;
+
+ // consume smaller key
+ iterator.next();
+ }
+ }
+
+ public void close() throws IOException
+ {}
+
+ private static class DKIterator extends AbstractIterator<DecoratedKey> implements PeekingIterator<DecoratedKey>
+ {
+ private final Iterator<DecoratedKey> keys;
+
+ public DKIterator(Iterator<DecoratedKey> keys)
+ {
+ this.keys = keys;
+ }
+
+ protected DecoratedKey computeNext()
+ {
+ return keys.hasNext() ? keys.next() : endOfData();
+ }
+ }
+
+ private static class DKToken extends Token
+ {
+ private final SortedSet<DecoratedKey> keys;
+
+ public DKToken(final DecoratedKey key)
+ {
+ super((long) key.getToken().getTokenValue());
+
+ keys = new TreeSet<DecoratedKey>(DecoratedKey.comparator)
+ {{
+ add(key);
+ }};
+ }
+
+ public void merge(CombinedValue<Long> other)
+ {
+ if (!(other instanceof Token))
+ return;
+
+ Token o = (Token) other;
+ assert o.get().equals(token);
+
+ if (o instanceof DKToken)
+ {
+ keys.addAll(((DKToken) o).keys);
+ }
+ else
+ {
+ for (DecoratedKey key : o)
+ keys.add(key);
+ }
+ }
+
+ public Iterator<DecoratedKey> iterator()
+ {
+ return keys.iterator();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/memory/MemIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/memory/MemIndex.java b/src/java/org/apache/cassandra/index/sasi/memory/MemIndex.java
new file mode 100644
index 0000000..22d6c9e
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/memory/MemIndex.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.memory;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.index.sasi.conf.ColumnIndex;
+import org.apache.cassandra.index.sasi.disk.Token;
+import org.apache.cassandra.index.sasi.plan.Expression;
+import org.apache.cassandra.index.sasi.utils.RangeIterator;
+import org.apache.cassandra.db.marshal.AbstractType;
+
+import org.github.jamm.MemoryMeter;
+
+public abstract class MemIndex
+{
+ protected final AbstractType<?> keyValidator;
+ protected final ColumnIndex columnIndex;
+
+ protected MemIndex(AbstractType<?> keyValidator, ColumnIndex columnIndex)
+ {
+ this.keyValidator = keyValidator;
+ this.columnIndex = columnIndex;
+ }
+
+ public abstract long add(DecoratedKey key, ByteBuffer value);
+ public abstract RangeIterator<Long, Token> search(Expression expression);
+
+ public static MemIndex forColumn(AbstractType<?> keyValidator, ColumnIndex columnIndex)
+ {
+ return columnIndex.isLiteral()
+ ? new TrieMemIndex(keyValidator, columnIndex)
+ : new SkipListMemIndex(keyValidator, columnIndex);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/memory/SkipListMemIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/memory/SkipListMemIndex.java b/src/java/org/apache/cassandra/index/sasi/memory/SkipListMemIndex.java
new file mode 100644
index 0000000..69b57d0
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/memory/SkipListMemIndex.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.memory;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.index.sasi.conf.ColumnIndex;
+import org.apache.cassandra.index.sasi.disk.Token;
+import org.apache.cassandra.index.sasi.plan.Expression;
+import org.apache.cassandra.index.sasi.utils.RangeUnionIterator;
+import org.apache.cassandra.index.sasi.utils.RangeIterator;
+import org.apache.cassandra.db.marshal.AbstractType;
+
+public class SkipListMemIndex extends MemIndex
+{
+ public static final int CSLM_OVERHEAD = 128; // average overhead of CSLM
+
+ private final ConcurrentSkipListMap<ByteBuffer, ConcurrentSkipListSet<DecoratedKey>> index;
+
+ public SkipListMemIndex(AbstractType<?> keyValidator, ColumnIndex columnIndex)
+ {
+ super(keyValidator, columnIndex);
+ index = new ConcurrentSkipListMap<>(columnIndex.getValidator());
+ }
+
+ public long add(DecoratedKey key, ByteBuffer value)
+ {
+ long overhead = CSLM_OVERHEAD; // DKs are shared
+ ConcurrentSkipListSet<DecoratedKey> keys = index.get(value);
+
+ if (keys == null)
+ {
+ ConcurrentSkipListSet<DecoratedKey> newKeys = new ConcurrentSkipListSet<>(DecoratedKey.comparator);
+ keys = index.putIfAbsent(value, newKeys);
+ if (keys == null)
+ {
+ overhead += CSLM_OVERHEAD + value.remaining();
+ keys = newKeys;
+ }
+ }
+
+ keys.add(key);
+
+ return overhead;
+ }
+
+ public RangeIterator<Long, Token> search(Expression expression)
+ {
+ ByteBuffer min = expression.lower == null ? null : expression.lower.value;
+ ByteBuffer max = expression.upper == null ? null : expression.upper.value;
+
+ SortedMap<ByteBuffer, ConcurrentSkipListSet<DecoratedKey>> search;
+
+ if (min == null && max == null)
+ {
+ throw new IllegalArgumentException();
+ }
+ if (min != null && max != null)
+ {
+ search = index.subMap(min, expression.lower.inclusive, max, expression.upper.inclusive);
+ }
+ else if (min == null)
+ {
+ search = index.headMap(max, expression.upper.inclusive);
+ }
+ else
+ {
+ search = index.tailMap(min, expression.lower.inclusive);
+ }
+
+ RangeUnionIterator.Builder<Long, Token> builder = RangeUnionIterator.builder();
+ search.values().stream()
+ .filter(keys -> !keys.isEmpty())
+ .forEach(keys -> builder.add(new KeyRangeIterator(keys)));
+
+ return builder.build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/memory/TrieMemIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/memory/TrieMemIndex.java b/src/java/org/apache/cassandra/index/sasi/memory/TrieMemIndex.java
new file mode 100644
index 0000000..e4ee6eb
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/memory/TrieMemIndex.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.memory;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.index.sasi.conf.ColumnIndex;
+import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder;
+import org.apache.cassandra.index.sasi.disk.Token;
+import org.apache.cassandra.index.sasi.plan.Expression;
+import org.apache.cassandra.index.sasi.analyzer.AbstractAnalyzer;
+import org.apache.cassandra.index.sasi.utils.RangeUnionIterator;
+import org.apache.cassandra.index.sasi.utils.RangeIterator;
+import org.apache.cassandra.db.marshal.AbstractType;
+
+import com.googlecode.concurrenttrees.radix.ConcurrentRadixTree;
+import com.googlecode.concurrenttrees.suffix.ConcurrentSuffixTree;
+import com.googlecode.concurrenttrees.radix.node.concrete.SmartArrayBasedNodeFactory;
+import com.googlecode.concurrenttrees.radix.node.Node;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.cassandra.index.sasi.memory.SkipListMemIndex.CSLM_OVERHEAD;
+
+public class TrieMemIndex extends MemIndex
+{
+ private static final Logger logger = LoggerFactory.getLogger(TrieMemIndex.class);
+
+ private final ConcurrentTrie index;
+
+ public TrieMemIndex(AbstractType<?> keyValidator, ColumnIndex columnIndex)
+ {
+ super(keyValidator, columnIndex);
+
+ switch (columnIndex.getMode().mode)
+ {
+ case CONTAINS:
+ index = new ConcurrentSuffixTrie(columnIndex.getDefinition());
+ break;
+
+ case PREFIX:
+ index = new ConcurrentPrefixTrie(columnIndex.getDefinition());
+ break;
+
+ default:
+ throw new IllegalStateException("Unsupported mode: " + columnIndex.getMode().mode);
+ }
+ }
+
+ public long add(DecoratedKey key, ByteBuffer value)
+ {
+ AbstractAnalyzer analyzer = columnIndex.getAnalyzer();
+ analyzer.reset(value.duplicate());
+
+ long size = 0;
+ while (analyzer.hasNext())
+ {
+ ByteBuffer term = analyzer.next();
+
+ if (term.remaining() >= OnDiskIndexBuilder.MAX_TERM_SIZE)
+ {
+ logger.info("Can't add term of column {} to index for key: {}, term size {} bytes, max allowed size {} bytes, use analyzed = true (if not yet set) for that column.",
+ columnIndex.getColumnName(),
+ keyValidator.getString(key.getKey()),
+ term.remaining(),
+ OnDiskIndexBuilder.MAX_TERM_SIZE);
+ continue;
+ }
+
+ size += index.add(columnIndex.getValidator().getString(term), key);
+ }
+
+ return size;
+ }
+
+ public RangeIterator<Long, Token> search(Expression expression)
+ {
+ return index.search(expression);
+ }
+
+ private static abstract class ConcurrentTrie
+ {
+ public static final SizeEstimatingNodeFactory NODE_FACTORY = new SizeEstimatingNodeFactory();
+
+ protected final ColumnDefinition definition;
+
+ public ConcurrentTrie(ColumnDefinition column)
+ {
+ definition = column;
+ }
+
+ public long add(String value, DecoratedKey key)
+ {
+ long overhead = CSLM_OVERHEAD;
+ ConcurrentSkipListSet<DecoratedKey> keys = get(value);
+ if (keys == null)
+ {
+ ConcurrentSkipListSet<DecoratedKey> newKeys = new ConcurrentSkipListSet<>(DecoratedKey.comparator);
+ keys = putIfAbsent(value, newKeys);
+ if (keys == null)
+ {
+ overhead += CSLM_OVERHEAD + value.length();
+ keys = newKeys;
+ }
+ }
+
+ keys.add(key);
+
+ // get and reset new memory size allocated by current thread
+ overhead += NODE_FACTORY.currentUpdateSize();
+ NODE_FACTORY.reset();
+
+ return overhead;
+ }
+
+ public RangeIterator<Long, Token> search(Expression expression)
+ {
+ assert expression.getOp() == Expression.Op.EQ; // means that min == max
+
+ ByteBuffer prefix = expression.lower == null ? null : expression.lower.value;
+
+ Iterable<ConcurrentSkipListSet<DecoratedKey>> search = search(definition.cellValueType().getString(prefix));
+
+ RangeUnionIterator.Builder<Long, Token> builder = RangeUnionIterator.builder();
+ for (ConcurrentSkipListSet<DecoratedKey> keys : search)
+ {
+ if (!keys.isEmpty())
+ builder.add(new KeyRangeIterator(keys));
+ }
+
+ return builder.build();
+ }
+
+ protected abstract ConcurrentSkipListSet<DecoratedKey> get(String value);
+ protected abstract Iterable<ConcurrentSkipListSet<DecoratedKey>> search(String value);
+ protected abstract ConcurrentSkipListSet<DecoratedKey> putIfAbsent(String value, ConcurrentSkipListSet<DecoratedKey> key);
+ }
+
+ protected static class ConcurrentPrefixTrie extends ConcurrentTrie
+ {
+ private final ConcurrentRadixTree<ConcurrentSkipListSet<DecoratedKey>> trie;
+
+ private ConcurrentPrefixTrie(ColumnDefinition column)
+ {
+ super(column);
+ trie = new ConcurrentRadixTree<>(NODE_FACTORY);
+ }
+
+ public ConcurrentSkipListSet<DecoratedKey> get(String value)
+ {
+ return trie.getValueForExactKey(value);
+ }
+
+ public ConcurrentSkipListSet<DecoratedKey> putIfAbsent(String value, ConcurrentSkipListSet<DecoratedKey> newKeys)
+ {
+ return trie.putIfAbsent(value, newKeys);
+ }
+
+ public Iterable<ConcurrentSkipListSet<DecoratedKey>> search(String value)
+ {
+ return trie.getValuesForKeysStartingWith(value);
+ }
+ }
+
+ protected static class ConcurrentSuffixTrie extends ConcurrentTrie
+ {
+ private final ConcurrentSuffixTree<ConcurrentSkipListSet<DecoratedKey>> trie;
+
+ private ConcurrentSuffixTrie(ColumnDefinition column)
+ {
+ super(column);
+ trie = new ConcurrentSuffixTree<>(NODE_FACTORY);
+ }
+
+ public ConcurrentSkipListSet<DecoratedKey> get(String value)
+ {
+ return trie.getValueForExactKey(value);
+ }
+
+ public ConcurrentSkipListSet<DecoratedKey> putIfAbsent(String value, ConcurrentSkipListSet<DecoratedKey> newKeys)
+ {
+ return trie.putIfAbsent(value, newKeys);
+ }
+
+ public Iterable<ConcurrentSkipListSet<DecoratedKey>> search(String value)
+ {
+ return trie.getValuesForKeysContaining(value);
+ }
+ }
+
+ // This relies on the fact that all of the tree updates are done under exclusive write lock,
+ // method would overestimate in certain circumstances e.g. when nodes are replaced in place,
+ // but it's still better comparing to underestimate since it gives more breathing room for other memory users.
+ private static class SizeEstimatingNodeFactory extends SmartArrayBasedNodeFactory
+ {
+ private final ThreadLocal<Long> updateSize = ThreadLocal.withInitial(() -> 0L);
+
+ public Node createNode(CharSequence edgeCharacters, Object value, List<Node> childNodes, boolean isRoot)
+ {
+ Node node = super.createNode(edgeCharacters, value, childNodes, isRoot);
+ updateSize.set(updateSize.get() + measure(node));
+ return node;
+ }
+
+ public long currentUpdateSize()
+ {
+ return updateSize.get();
+ }
+
+ public void reset()
+ {
+ updateSize.set(0L);
+ }
+
+ private long measure(Node node)
+ {
+ // node with max overhead is CharArrayNodeLeafWithValue = 24B
+ long overhead = 24;
+
+ // array of chars (2 bytes) + CharSequence overhead
+ overhead += 24 + node.getIncomingEdge().length() * 2;
+
+ if (node.getOutgoingEdges() != null)
+ {
+ // 16 bytes for AtomicReferenceArray
+ overhead += 16;
+ overhead += 24 * node.getOutgoingEdges().size();
+ }
+
+ return overhead;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/72790dc8/src/java/org/apache/cassandra/index/sasi/plan/Expression.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/plan/Expression.java b/src/java/org/apache/cassandra/index/sasi/plan/Expression.java
new file mode 100644
index 0000000..e215ec7
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/sasi/plan/Expression.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi.plan;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.Operator;
+import org.apache.cassandra.index.sasi.analyzer.AbstractAnalyzer;
+import org.apache.cassandra.index.sasi.conf.ColumnIndex;
+import org.apache.cassandra.index.sasi.disk.OnDiskIndex;
+import org.apache.cassandra.index.sasi.utils.TypeUtil;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Iterators;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Expression
+{
+ private static final Logger logger = LoggerFactory.getLogger(Expression.class);
+
+ public enum Op
+ {
+ EQ, NOT_EQ, RANGE
+ }
+
+ private final QueryController controller;
+
+ public final AbstractAnalyzer analyzer;
+
+ public final ColumnIndex index;
+ public final AbstractType<?> validator;
+ public final boolean isLiteral;
+
+ @VisibleForTesting
+ protected Op operation;
+
+ public Bound lower, upper;
+ public List<ByteBuffer> exclusions = new ArrayList<>();
+
+ public Expression(Expression other)
+ {
+ this(other.controller, other.index);
+ operation = other.operation;
+ }
+
+ public Expression(QueryController controller, ColumnIndex columnIndex)
+ {
+ this.controller = controller;
+ this.index = columnIndex;
+ this.analyzer = columnIndex.getAnalyzer();
+ this.validator = columnIndex.getValidator();
+ this.isLiteral = columnIndex.isLiteral();
+ }
+
+ @VisibleForTesting
+ public Expression(String name, AbstractType<?> validator)
+ {
+ this(null, new ColumnIndex(UTF8Type.instance, ColumnDefinition.regularDef("sasi", "internal", name, validator), null));
+ }
+
+ public Expression setLower(Bound newLower)
+ {
+ lower = newLower == null ? null : new Bound(newLower.value, newLower.inclusive);
+ return this;
+ }
+
+ public Expression setUpper(Bound newUpper)
+ {
+ upper = newUpper == null ? null : new Bound(newUpper.value, newUpper.inclusive);
+ return this;
+ }
+
+ public Expression setOp(Op op)
+ {
+ this.operation = op;
+ return this;
+ }
+
+ public Expression add(Operator op, ByteBuffer value)
+ {
+ boolean lowerInclusive = false, upperInclusive = false;
+ switch (op)
+ {
+ case EQ:
+ lower = new Bound(value, true);
+ upper = lower;
+ operation = Op.EQ;
+ break;
+
+ case NEQ:
+ // index expressions are priority sorted
+ // and NOT_EQ is the lowest priority, which means that operation type
+ // is always going to be set before reaching it in case of RANGE or EQ.
+ if (operation == null)
+ {
+ operation = Op.NOT_EQ;
+ lower = new Bound(value, true);
+ upper = lower;
+ }
+ else
+ exclusions.add(value);
+ break;
+
+ case LTE:
+ upperInclusive = true;
+ case LT:
+ operation = Op.RANGE;
+ upper = new Bound(value, upperInclusive);
+ break;
+
+ case GTE:
+ lowerInclusive = true;
+ case GT:
+ operation = Op.RANGE;
+ lower = new Bound(value, lowerInclusive);
+ break;
+ }
+
+ return this;
+ }
+
+ public Expression addExclusion(ByteBuffer value)
+ {
+ exclusions.add(value);
+ return this;
+ }
+
+ public boolean contains(ByteBuffer value)
+ {
+ if (!TypeUtil.isValid(value, validator))
+ {
+ int size = value.remaining();
+ if ((value = TypeUtil.tryUpcast(value, validator)) == null)
+ {
+ logger.error("Can't cast value for {} to size accepted by {}, value size is {} bytes.",
+ index.getColumnName(),
+ validator,
+ size);
+ return false;
+ }
+ }
+
+ if (lower != null)
+ {
+ // suffix check
+ if (isLiteral)
+ {
+ if (!validateStringValue(value, lower.value))
+ return false;
+ }
+ else
+ {
+ // range or (not-)equals - (mainly) for numeric values
+ int cmp = validator.compare(lower.value, value);
+
+ // in case of (NOT_)EQ lower == upper
+ if (operation == Op.EQ || operation == Op.NOT_EQ)
+ return cmp == 0;
+
+ if (cmp > 0 || (cmp == 0 && !lower.inclusive))
+ return false;
+ }
+ }
+
+ if (upper != null && lower != upper)
+ {
+ // string (prefix or suffix) check
+ if (isLiteral)
+ {
+ if (!validateStringValue(value, upper.value))
+ return false;
+ }
+ else
+ {
+ // range - mainly for numeric values
+ int cmp = validator.compare(upper.value, value);
+ if (cmp < 0 || (cmp == 0 && !upper.inclusive))
+ return false;
+ }
+ }
+
+ // as a last step let's check exclusions for the given field,
+ // this covers EQ/RANGE with exclusions.
+ for (ByteBuffer term : exclusions)
+ {
+ if (isLiteral && validateStringValue(value, term))
+ return false;
+ else if (validator.compare(term, value) == 0)
+ return false;
+ }
+
+ return true;
+ }
+
+ private boolean validateStringValue(ByteBuffer columnValue, ByteBuffer requestedValue)
+ {
+ analyzer.reset(columnValue.duplicate());
+ while (analyzer.hasNext())
+ {
+ ByteBuffer term = analyzer.next();
+ if (ByteBufferUtil.contains(term, requestedValue))
+ return true;
+ }
+
+ return false;
+ }
+
+ public Op getOp()
+ {
+ return operation;
+ }
+
+ public void checkpoint()
+ {
+ if (controller == null)
+ return;
+
+ controller.checkpoint();
+ }
+
+ public boolean hasLower()
+ {
+ return lower != null;
+ }
+
+ public boolean hasUpper()
+ {
+ return upper != null;
+ }
+
+ public boolean isLowerSatisfiedBy(OnDiskIndex.DataTerm term)
+ {
+ if (!hasLower())
+ return true;
+
+ int cmp = term.compareTo(validator, lower.value, false);
+ return cmp > 0 || cmp == 0 && lower.inclusive;
+ }
+
+ public boolean isUpperSatisfiedBy(OnDiskIndex.DataTerm term)
+ {
+ if (!hasUpper())
+ return true;
+
+ int cmp = term.compareTo(validator, upper.value, false);
+ return cmp < 0 || cmp == 0 && upper.inclusive;
+ }
+
+ public boolean isIndexed()
+ {
+ return index.isIndexed();
+ }
+
+ public String toString()
+ {
+ return String.format("Expression{name: %s, op: %s, lower: (%s, %s), upper: (%s, %s), exclusions: %s}",
+ index.getColumnName(),
+ operation,
+ lower == null ? "null" : validator.getString(lower.value),
+ lower != null && lower.inclusive,
+ upper == null ? "null" : validator.getString(upper.value),
+ upper != null && upper.inclusive,
+ Iterators.toString(Iterators.transform(exclusions.iterator(), validator::getString)));
+ }
+
+ public int hashCode()
+ {
+ return new HashCodeBuilder().append(index.getColumnName())
+ .append(operation)
+ .append(validator)
+ .append(lower).append(upper)
+ .append(exclusions).build();
+ }
+
+ public boolean equals(Object other)
+ {
+ if (!(other instanceof Expression))
+ return false;
+
+ if (this == other)
+ return true;
+
+ Expression o = (Expression) other;
+
+ return Objects.equals(index.getColumnName(), o.index.getColumnName())
+ && validator.equals(o.validator)
+ && operation == o.operation
+ && Objects.equals(lower, o.lower)
+ && Objects.equals(upper, o.upper)
+ && exclusions.equals(o.exclusions);
+ }
+
+
+ public static class Bound
+ {
+ public final ByteBuffer value;
+ public final boolean inclusive;
+
+ public Bound(ByteBuffer value, boolean inclusive)
+ {
+ this.value = value;
+ this.inclusive = inclusive;
+ }
+
+ public boolean equals(Object other)
+ {
+ if (!(other instanceof Bound))
+ return false;
+
+ Bound o = (Bound) other;
+ return value.equals(o.value) && inclusive == o.inclusive;
+ }
+ }
+}