You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2017/03/07 10:06:59 UTC
kylin git commit: Rename FuzzyRowFilter to
org/apache/kylin/storage/hbase/cube/v2/FuzzyRowFilter.java
Repository: kylin
Updated Branches:
refs/heads/shaofeng-hbasefuzzyfilter 13fc3e648 -> bbdf433af
Rename FuzzyRowFilter to org/apache/kylin/storage/hbase/cube/v2/FuzzyRowFilter.java
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/bbdf433a
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/bbdf433a
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/bbdf433a
Branch: refs/heads/shaofeng-hbasefuzzyfilter
Commit: bbdf433af915105333a67d4881715a88758a66c3
Parents: 13fc3e6
Author: shaofengshi <sh...@apache.org>
Authored: Tue Mar 7 18:06:49 2017 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Mar 7 18:06:49 2017 +0800
----------------------------------------------------------------------
pom.xml | 2 +-
.../hadoop/hbase/filter/FuzzyRowFilter.java | 636 ------------------
.../storage/hbase/cube/v2/CubeHBaseRPC.java | 1 -
.../storage/hbase/cube/v2/FuzzyRowFilter.java | 638 +++++++++++++++++++
4 files changed, 639 insertions(+), 638 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/bbdf433a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0ee5f03..6d3425e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,7 +54,7 @@
<hive-hcatalog.version>1.2.1</hive-hcatalog.version>
<!-- HBase versions -->
- <hbase-hadoop2.version>1.1.2</hbase-hadoop2.version>
+ <hbase-hadoop2.version>1.1.1</hbase-hadoop2.version>
<kafka.version>0.10.0.0</kafka.version>
<!-- Hadoop deps, keep compatible with hadoop2.version -->
http://git-wip-us.apache.org/repos/asf/kylin/blob/bbdf433a/storage-hbase/src/main/java/org/apache/hadoop/hbase/filter/FuzzyRowFilter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/hadoop/hbase/filter/FuzzyRowFilter.java b/storage-hbase/src/main/java/org/apache/hadoop/hbase/filter/FuzzyRowFilter.java
deleted file mode 100644
index e34ebc4..0000000
--- a/storage-hbase/src/main/java/org/apache/hadoop/hbase/filter/FuzzyRowFilter.java
+++ /dev/null
@@ -1,636 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.hadoop.hbase.filter;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.List;
-import java.util.PriorityQueue;
-
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.protobuf.generated.FilterProtos;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair;
-import org.apache.hadoop.hbase.util.ByteStringer;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.UnsafeAccess;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.InvalidProtocolBufferException;
-
-/**
- * This is optimized version of a standard FuzzyRowFilter Filters data based on fuzzy row key.
- * Performs fast-forwards during scanning. It takes pairs (row key, fuzzy info) to match row keys.
- * Where fuzzy info is a byte array with 0 or 1 as its values:
- * <ul>
- * <li>0 - means that this byte in provided row key is fixed, i.e. row key's byte at same position
- * must match</li>
- * <li>1 - means that this byte in provided row key is NOT fixed, i.e. row key's byte at this
- * position can be different from the one in provided row key</li>
- * </ul>
- * Example: Let's assume row key format is userId_actionId_year_month. Length of userId is fixed and
- * is 4, length of actionId is 2 and year and month are 4 and 2 bytes long respectively. Let's
- * assume that we need to fetch all users that performed certain action (encoded as "99") in Jan of
- * any year. Then the pair (row key, fuzzy info) would be the following: row key = "????_99_????_01"
- * (one can use any value instead of "?") fuzzy info =
- * "\x01\x01\x01\x01\x00\x00\x00\x00\x01\x01\x01\x01\x00\x00\x00" I.e. fuzzy info tells the matching
- * mask is "????_99_????_01", where at ? can be any value.
- */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public class FuzzyRowFilter extends FilterBase {
- private List<Pair<byte[], byte[]>> fuzzyKeysData;
- private boolean done = false;
-
- /**
- * The index of a last successfully found matching fuzzy string (in fuzzyKeysData). We will start
- * matching next KV with this one. If they do not match then we will return back to the one-by-one
- * iteration over fuzzyKeysData.hbas
- */
- private int lastFoundIndex = -1;
-
- /**
- * Row tracker (keeps all next rows after SEEK_NEXT_USING_HINT was returned)
- */
- private RowTracker tracker;
-
- public FuzzyRowFilter(List<Pair<byte[], byte[]>> fuzzyKeysData) {
- Pair<byte[], byte[]> p;
- for (int i = 0; i < fuzzyKeysData.size(); i++) {
- p = fuzzyKeysData.get(i);
- if (p.getFirst().length != p.getSecond().length) {
- Pair<String, String> readable =
- new Pair<String, String>(Bytes.toStringBinary(p.getFirst()), Bytes.toStringBinary(p
- .getSecond()));
- throw new IllegalArgumentException("Fuzzy pair lengths do not match: " + readable);
- }
- // update mask ( 0 -> -1 (0xff), 1 -> 0)
- p.setSecond(preprocessMask(p.getSecond()));
- preprocessSearchKey(p);
- }
- this.fuzzyKeysData = fuzzyKeysData;
- this.tracker = new RowTracker();
- }
-
- private void preprocessSearchKey(Pair<byte[], byte[]> p) {
-// if (UnsafeAccess.unaligned() == false) {
-// return;
-// }
- byte[] key = p.getFirst();
- byte[] mask = p.getSecond();
- for (int i = 0; i < mask.length; i++) {
- // set non-fixed part of a search key to 0.
- if (mask[i] == 0) key[i] = 0;
- }
- }
-
- /**
- * We need to preprocess mask array, as since we treat 0's as unfixed positions and -1 (0xff) as
- * fixed positions
- * @param mask
- * @return mask array
- */
- private byte[] preprocessMask(byte[] mask) {
-// if (UnsafeAccess.unaligned() == false) {
-// return mask;
-// }
- if (isPreprocessedMask(mask)) return mask;
- for (int i = 0; i < mask.length; i++) {
- if (mask[i] == 0) {
- mask[i] = -1; // 0 -> -1
- } else if (mask[i] == 1) {
- mask[i] = 0;// 1 -> 0
- }
- }
- return mask;
- }
-
- private boolean isPreprocessedMask(byte[] mask) {
- for (int i = 0; i < mask.length; i++) {
- if (mask[i] != -1 && mask[i] != 0) {
- return false;
- }
- }
- return true;
- }
-
- @Override
- public ReturnCode filterKeyValue(Cell c) {
- final int startIndex = lastFoundIndex >= 0 ? lastFoundIndex : 0;
- final int size = fuzzyKeysData.size();
- for (int i = startIndex; i < size + startIndex; i++) {
- final int index = i % size;
- Pair<byte[], byte[]> fuzzyData = fuzzyKeysData.get(index);
- SatisfiesCode satisfiesCode =
- satisfies(isReversed(), c.getRowArray(), c.getRowOffset(), c.getRowLength(),
- fuzzyData.getFirst(), fuzzyData.getSecond());
- if (satisfiesCode == SatisfiesCode.YES) {
- lastFoundIndex = index;
- return ReturnCode.INCLUDE;
- }
- }
- // NOT FOUND -> seek next using hint
- lastFoundIndex = -1;
-
- return ReturnCode.SEEK_NEXT_USING_HINT;
-
- }
-
- @Override
- public Cell getNextCellHint(Cell currentCell) {
- boolean result = tracker.updateTracker(currentCell);
- if (result == false) {
- done = true;
- return null;
- }
- byte[] nextRowKey = tracker.nextRow();
- return KeyValueUtil.createFirstOnRow(nextRowKey);
- }
-
- /**
- * If we have multiple fuzzy keys, row tracker should improve overall performance. It calculates
- * all next rows (one per every fuzzy key) and put them (the fuzzy key is bundled) into a priority
- * queue so that the smallest row key always appears at queue head, which helps to decide the
- * "Next Cell Hint". As scanning going on, the number of candidate rows in the RowTracker will
- * remain the size of fuzzy keys until some of the fuzzy keys won't possibly have matches any
- * more.
- */
- private class RowTracker {
- private final PriorityQueue<Pair<byte[], Pair<byte[], byte[]>>> nextRows;
- private boolean initialized = false;
-
- RowTracker() {
- nextRows =
- new PriorityQueue<Pair<byte[], Pair<byte[], byte[]>>>(fuzzyKeysData.size(),
- new Comparator<Pair<byte[], Pair<byte[], byte[]>>>() {
- @Override
- public int compare(Pair<byte[], Pair<byte[], byte[]>> o1,
- Pair<byte[], Pair<byte[], byte[]>> o2) {
- int compare = Bytes.compareTo(o1.getFirst(), o2.getFirst());
- if (!isReversed()) {
- return compare;
- } else {
- return -compare;
- }
- }
- });
- }
-
- byte[] nextRow() {
- if (nextRows.isEmpty()) {
- throw new IllegalStateException(
- "NextRows should not be empty, make sure to call nextRow() after updateTracker() return true");
- } else {
- return nextRows.peek().getFirst();
- }
- }
-
- boolean updateTracker(Cell currentCell) {
- if (!initialized) {
- for (Pair<byte[], byte[]> fuzzyData : fuzzyKeysData) {
- updateWith(currentCell, fuzzyData);
- }
- initialized = true;
- } else {
- while (!nextRows.isEmpty() && !lessThan(currentCell, nextRows.peek().getFirst())) {
- Pair<byte[], Pair<byte[], byte[]>> head = nextRows.poll();
- Pair<byte[], byte[]> fuzzyData = head.getSecond();
- updateWith(currentCell, fuzzyData);
- }
- }
- return !nextRows.isEmpty();
- }
-
- boolean lessThan(Cell currentCell, byte[] nextRowKey) {
- int compareResult =
- Bytes.compareTo(currentCell.getRowArray(), currentCell.getRowOffset(),
- currentCell.getRowLength(), nextRowKey, 0, nextRowKey.length);
- return (!isReversed() && compareResult < 0) || (isReversed() && compareResult > 0);
- }
-
- void updateWith(Cell currentCell, Pair<byte[], byte[]> fuzzyData) {
- byte[] nextRowKeyCandidate =
- getNextForFuzzyRule(isReversed(), currentCell.getRowArray(), currentCell.getRowOffset(),
- currentCell.getRowLength(), fuzzyData.getFirst(), fuzzyData.getSecond());
- if (nextRowKeyCandidate != null) {
- nextRows.add(new Pair<byte[], Pair<byte[], byte[]>>(nextRowKeyCandidate, fuzzyData));
- }
- }
-
- }
-
- @Override
- public boolean filterAllRemaining() {
- return done;
- }
-
- /**
- * @return The filter serialized using pb
- */
- public byte[] toByteArray() {
- FilterProtos.FuzzyRowFilter.Builder builder = FilterProtos.FuzzyRowFilter.newBuilder();
- for (Pair<byte[], byte[]> fuzzyData : fuzzyKeysData) {
- BytesBytesPair.Builder bbpBuilder = BytesBytesPair.newBuilder();
- bbpBuilder.setFirst(ByteStringer.wrap(fuzzyData.getFirst()));
- bbpBuilder.setSecond(ByteStringer.wrap(fuzzyData.getSecond()));
- builder.addFuzzyKeysData(bbpBuilder);
- }
- return builder.build().toByteArray();
- }
-
- /**
- * @param pbBytes A pb serialized {@link FuzzyRowFilter} instance
- * @return An instance of {@link FuzzyRowFilter} made from <code>bytes</code>
- * @throws DeserializationException
- * @see #toByteArray
- */
- public static FuzzyRowFilter parseFrom(final byte[] pbBytes) throws DeserializationException {
- FilterProtos.FuzzyRowFilter proto;
- try {
- proto = FilterProtos.FuzzyRowFilter.parseFrom(pbBytes);
- } catch (InvalidProtocolBufferException e) {
- throw new DeserializationException(e);
- }
- int count = proto.getFuzzyKeysDataCount();
- ArrayList<Pair<byte[], byte[]>> fuzzyKeysData = new ArrayList<Pair<byte[], byte[]>>(count);
- for (int i = 0; i < count; ++i) {
- BytesBytesPair current = proto.getFuzzyKeysData(i);
- byte[] keyBytes = current.getFirst().toByteArray();
- byte[] keyMeta = current.getSecond().toByteArray();
- fuzzyKeysData.add(new Pair<byte[], byte[]>(keyBytes, keyMeta));
- }
- return new FuzzyRowFilter(fuzzyKeysData);
- }
-
- @Override
- public String toString() {
- final StringBuilder sb = new StringBuilder();
- sb.append("FuzzyRowFilter");
- sb.append("{fuzzyKeysData=");
- for (Pair<byte[], byte[]> fuzzyData : fuzzyKeysData) {
- sb.append('{').append(Bytes.toStringBinary(fuzzyData.getFirst())).append(":");
- sb.append(Bytes.toStringBinary(fuzzyData.getSecond())).append('}');
- }
- sb.append("}, ");
- return sb.toString();
- }
-
- // Utility methods
-
- static enum SatisfiesCode {
- /** row satisfies fuzzy rule */
- YES,
- /** row doesn't satisfy fuzzy rule, but there's possible greater row that does */
- NEXT_EXISTS,
- /** row doesn't satisfy fuzzy rule and there's no greater row that does */
- NO_NEXT
- }
-
- @VisibleForTesting
- static SatisfiesCode satisfies(byte[] row, byte[] fuzzyKeyBytes, byte[] fuzzyKeyMeta) {
- return satisfies(false, row, 0, row.length, fuzzyKeyBytes, fuzzyKeyMeta);
- }
-
- @VisibleForTesting
- static SatisfiesCode satisfies(boolean reverse, byte[] row, byte[] fuzzyKeyBytes,
- byte[] fuzzyKeyMeta) {
- return satisfies(reverse, row, 0, row.length, fuzzyKeyBytes, fuzzyKeyMeta);
- }
-
- static SatisfiesCode satisfies(boolean reverse, byte[] row, int offset, int length,
- byte[] fuzzyKeyBytes, byte[] fuzzyKeyMeta) {
-
-// if (UnsafeAccess.unaligned() == false) {
-// return satisfiesNoUnsafe(reverse, row, offset, length, fuzzyKeyBytes, fuzzyKeyMeta);
-// }
-
- if (row == null) {
- // do nothing, let scan to proceed
- return SatisfiesCode.YES;
- }
- length = Math.min(length, fuzzyKeyBytes.length);
- int numWords = length / Bytes.SIZEOF_LONG;
- int offsetAdj = offset + UnsafeAccess.BYTE_ARRAY_BASE_OFFSET;
-
- int j = numWords << 3; // numWords * SIZEOF_LONG;
-
- for (int i = 0; i < j; i += Bytes.SIZEOF_LONG) {
-
- long fuzzyBytes =
- UnsafeAccess.theUnsafe.getLong(fuzzyKeyBytes, UnsafeAccess.BYTE_ARRAY_BASE_OFFSET
- + (long) i);
- long fuzzyMeta =
- UnsafeAccess.theUnsafe.getLong(fuzzyKeyMeta, UnsafeAccess.BYTE_ARRAY_BASE_OFFSET
- + (long) i);
- long rowValue = UnsafeAccess.theUnsafe.getLong(row, offsetAdj + (long) i);
- if ((rowValue & fuzzyMeta) != (fuzzyBytes)) {
- // We always return NEXT_EXISTS
- return SatisfiesCode.NEXT_EXISTS;
- }
- }
-
- int off = j;
-
- if (length - off >= Bytes.SIZEOF_INT) {
- int fuzzyBytes =
- UnsafeAccess.theUnsafe.getInt(fuzzyKeyBytes, UnsafeAccess.BYTE_ARRAY_BASE_OFFSET
- + (long) off);
- int fuzzyMeta =
- UnsafeAccess.theUnsafe.getInt(fuzzyKeyMeta, UnsafeAccess.BYTE_ARRAY_BASE_OFFSET
- + (long) off);
- int rowValue = UnsafeAccess.theUnsafe.getInt(row, offsetAdj + (long) off);
- if ((rowValue & fuzzyMeta) != (fuzzyBytes)) {
- // We always return NEXT_EXISTS
- return SatisfiesCode.NEXT_EXISTS;
- }
- off += Bytes.SIZEOF_INT;
- }
-
- if (length - off >= Bytes.SIZEOF_SHORT) {
- short fuzzyBytes =
- UnsafeAccess.theUnsafe.getShort(fuzzyKeyBytes, UnsafeAccess.BYTE_ARRAY_BASE_OFFSET
- + (long) off);
- short fuzzyMeta =
- UnsafeAccess.theUnsafe.getShort(fuzzyKeyMeta, UnsafeAccess.BYTE_ARRAY_BASE_OFFSET
- + (long) off);
- short rowValue = UnsafeAccess.theUnsafe.getShort(row, offsetAdj + (long) off);
- if ((rowValue & fuzzyMeta) != (fuzzyBytes)) {
- // We always return NEXT_EXISTS
- // even if it does not (in this case getNextForFuzzyRule
- // will return null)
- return SatisfiesCode.NEXT_EXISTS;
- }
- off += Bytes.SIZEOF_SHORT;
- }
-
- if (length - off >= Bytes.SIZEOF_BYTE) {
- int fuzzyBytes = fuzzyKeyBytes[off] & 0xff;
- int fuzzyMeta = fuzzyKeyMeta[off] & 0xff;
- int rowValue = row[offset + off] & 0xff;
- if ((rowValue & fuzzyMeta) != (fuzzyBytes)) {
- // We always return NEXT_EXISTS
- return SatisfiesCode.NEXT_EXISTS;
- }
- }
- return SatisfiesCode.YES;
- }
-
- static SatisfiesCode satisfiesNoUnsafe(boolean reverse, byte[] row, int offset, int length,
- byte[] fuzzyKeyBytes, byte[] fuzzyKeyMeta) {
- if (row == null) {
- // do nothing, let scan to proceed
- return SatisfiesCode.YES;
- }
-
- Order order = Order.orderFor(reverse);
- boolean nextRowKeyCandidateExists = false;
-
- for (int i = 0; i < fuzzyKeyMeta.length && i < length; i++) {
- // First, checking if this position is fixed and not equals the given one
- boolean byteAtPositionFixed = fuzzyKeyMeta[i] == 0;
- boolean fixedByteIncorrect = byteAtPositionFixed && fuzzyKeyBytes[i] != row[i + offset];
- if (fixedByteIncorrect) {
- // in this case there's another row that satisfies fuzzy rule and bigger than this row
- if (nextRowKeyCandidateExists) {
- return SatisfiesCode.NEXT_EXISTS;
- }
-
- // If this row byte is less than fixed then there's a byte array bigger than
- // this row and which satisfies the fuzzy rule. Otherwise there's no such byte array:
- // this row is simply bigger than any byte array that satisfies the fuzzy rule
- boolean rowByteLessThanFixed = (row[i + offset] & 0xFF) < (fuzzyKeyBytes[i] & 0xFF);
- if (rowByteLessThanFixed && !reverse) {
- return SatisfiesCode.NEXT_EXISTS;
- } else if (!rowByteLessThanFixed && reverse) {
- return SatisfiesCode.NEXT_EXISTS;
- } else {
- return SatisfiesCode.NO_NEXT;
- }
- }
-
- // Second, checking if this position is not fixed and byte value is not the biggest. In this
- // case there's a byte array bigger than this row and which satisfies the fuzzy rule. To get
- // bigger byte array that satisfies the rule we need to just increase this byte
- // (see the code of getNextForFuzzyRule below) by one.
- // Note: if non-fixed byte is already at biggest value, this doesn't allow us to say there's
- // bigger one that satisfies the rule as it can't be increased.
- if (fuzzyKeyMeta[i] == 1 && !order.isMax(fuzzyKeyBytes[i])) {
- nextRowKeyCandidateExists = true;
- }
- }
- return SatisfiesCode.YES;
- }
-
- @VisibleForTesting
- static byte[] getNextForFuzzyRule(byte[] row, byte[] fuzzyKeyBytes, byte[] fuzzyKeyMeta) {
- return getNextForFuzzyRule(false, row, 0, row.length, fuzzyKeyBytes, fuzzyKeyMeta);
- }
-
- @VisibleForTesting
- static byte[] getNextForFuzzyRule(boolean reverse, byte[] row, byte[] fuzzyKeyBytes,
- byte[] fuzzyKeyMeta) {
- return getNextForFuzzyRule(reverse, row, 0, row.length, fuzzyKeyBytes, fuzzyKeyMeta);
- }
-
- /** Abstracts directional comparisons based on scan direction. */
- private enum Order {
- ASC {
- public boolean lt(int lhs, int rhs) {
- return lhs < rhs;
- }
-
- public boolean gt(int lhs, int rhs) {
- return lhs > rhs;
- }
-
- public byte inc(byte val) {
- // TODO: what about over/underflow?
- return (byte) (val + 1);
- }
-
- public boolean isMax(byte val) {
- return val == (byte) 0xff;
- }
-
- public byte min() {
- return 0;
- }
- },
- DESC {
- public boolean lt(int lhs, int rhs) {
- return lhs > rhs;
- }
-
- public boolean gt(int lhs, int rhs) {
- return lhs < rhs;
- }
-
- public byte inc(byte val) {
- // TODO: what about over/underflow?
- return (byte) (val - 1);
- }
-
- public boolean isMax(byte val) {
- return val == 0;
- }
-
- public byte min() {
- return (byte) 0xFF;
- }
- };
-
- public static Order orderFor(boolean reverse) {
- return reverse ? DESC : ASC;
- }
-
- /** Returns true when {@code lhs < rhs}. */
- public abstract boolean lt(int lhs, int rhs);
-
- /** Returns true when {@code lhs > rhs}. */
- public abstract boolean gt(int lhs, int rhs);
-
- /** Returns {@code val} incremented by 1. */
- public abstract byte inc(byte val);
-
- /** Return true when {@code val} is the maximum value */
- public abstract boolean isMax(byte val);
-
- /** Return the minimum value according to this ordering scheme. */
- public abstract byte min();
- }
-
- /**
- * @return greater byte array than given (row) which satisfies the fuzzy rule if it exists, null
- * otherwise
- */
- @VisibleForTesting
- static byte[] getNextForFuzzyRule(boolean reverse, byte[] row, int offset, int length,
- byte[] fuzzyKeyBytes, byte[] fuzzyKeyMeta) {
- // To find out the next "smallest" byte array that satisfies fuzzy rule and "greater" than
- // the given one we do the following:
- // 1. setting values on all "fixed" positions to the values from fuzzyKeyBytes
- // 2. if during the first step given row did not increase, then we increase the value at
- // the first "non-fixed" position (where it is not maximum already)
-
- // It is easier to perform this by using fuzzyKeyBytes copy and setting "non-fixed" position
- // values than otherwise.
- byte[] result =
- Arrays.copyOf(fuzzyKeyBytes, length > fuzzyKeyBytes.length ? length : fuzzyKeyBytes.length);
- if (reverse && length > fuzzyKeyBytes.length) {
- // we need trailing 0xff's instead of trailing 0x00's
- for (int i = fuzzyKeyBytes.length; i < result.length; i++) {
- result[i] = (byte) 0xFF;
- }
- }
- int toInc = -1;
- final Order order = Order.orderFor(reverse);
-
- boolean increased = false;
- for (int i = 0; i < result.length; i++) {
- if (i >= fuzzyKeyMeta.length || fuzzyKeyMeta[i] == 0 /* non-fixed */) {
- result[i] = row[offset + i];
- if (!order.isMax(row[offset + i])) {
- // this is "non-fixed" position and is not at max value, hence we can increase it
- toInc = i;
- }
- } else if (i < fuzzyKeyMeta.length && fuzzyKeyMeta[i] == -1 /* fixed */) {
- if (order.lt((row[i + offset] & 0xFF), (fuzzyKeyBytes[i] & 0xFF))) {
- // if setting value for any fixed position increased the original array,
- // we are OK
- increased = true;
- break;
- }
-
- if (order.gt((row[i + offset] & 0xFF), (fuzzyKeyBytes[i] & 0xFF))) {
- // if setting value for any fixed position makes array "smaller", then just stop:
- // in case we found some non-fixed position to increase we will do it, otherwise
- // there's no "next" row key that satisfies fuzzy rule and "greater" than given row
- break;
- }
- }
- }
-
- if (!increased) {
- if (toInc < 0) {
- return null;
- }
- result[toInc] = order.inc(result[toInc]);
-
- // Setting all "non-fixed" positions to zeroes to the right of the one we increased so
- // that found "next" row key is the smallest possible
- for (int i = toInc + 1; i < result.length; i++) {
- if (i >= fuzzyKeyMeta.length || fuzzyKeyMeta[i] == 0 /* non-fixed */) {
- result[i] = order.min();
- }
- }
- }
-
- return reverse? result: trimTrailingZeroes(result, fuzzyKeyMeta, toInc);
- }
-
- /**
- * For forward scanner, next cell hint should not contain any trailing zeroes
- * unless they are part of fuzzyKeyMeta
- * hint = '\x01\x01\x01\x00\x00'
- * will skip valid row '\x01\x01\x01'
- *
- * @param result
- * @param fuzzyKeyMeta
- * @param toInc - position of incremented byte
- * @return trimmed version of result
- */
-
- private static byte[] trimTrailingZeroes(byte[] result, byte[] fuzzyKeyMeta, int toInc) {
- int off = fuzzyKeyMeta.length >= result.length? result.length -1:
- fuzzyKeyMeta.length -1;
- for( ; off >= 0; off--){
- if(fuzzyKeyMeta[off] != 0) break;
- }
- if (off < toInc) off = toInc;
- byte[] retValue = new byte[off+1];
- System.arraycopy(result, 0, retValue, 0, retValue.length);
- return retValue;
- }
-
- /**
- * @return true if and only if the fields of the filter that are serialized are equal to the
- * corresponding fields in other. Used for testing.
- */
- boolean areSerializedFieldsEqual(Filter o) {
- if (o == this) return true;
- if (!(o instanceof FuzzyRowFilter)) return false;
-
- FuzzyRowFilter other = (FuzzyRowFilter) o;
- if (this.fuzzyKeysData.size() != other.fuzzyKeysData.size()) return false;
- for (int i = 0; i < fuzzyKeysData.size(); ++i) {
- Pair<byte[], byte[]> thisData = this.fuzzyKeysData.get(i);
- Pair<byte[], byte[]> otherData = other.fuzzyKeysData.get(i);
- if (!(Bytes.equals(thisData.getFirst(), otherData.getFirst()) && Bytes.equals(
- thisData.getSecond(), otherData.getSecond()))) {
- return false;
- }
- }
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/bbdf433a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
index 05b34c7..0aee202 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
-import org.apache.hadoop.hbase.filter.FuzzyRowFilter;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.common.util.Bytes;
http://git-wip-us.apache.org/repos/asf/kylin/blob/bbdf433a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/FuzzyRowFilter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/FuzzyRowFilter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/FuzzyRowFilter.java
new file mode 100644
index 0000000..2c9579e
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/FuzzyRowFilter.java
@@ -0,0 +1,638 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.cube.v2;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.protobuf.generated.FilterProtos;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair;
+import org.apache.hadoop.hbase.util.ByteStringer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.UnsafeAccess;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * This is optimized version of a standard FuzzyRowFilter Filters data based on fuzzy row key.
+ * Performs fast-forwards during scanning. It takes pairs (row key, fuzzy info) to match row keys.
+ * Where fuzzy info is a byte array with 0 or 1 as its values:
+ * <ul>
+ * <li>0 - means that this byte in provided row key is fixed, i.e. row key's byte at same position
+ * must match</li>
+ * <li>1 - means that this byte in provided row key is NOT fixed, i.e. row key's byte at this
+ * position can be different from the one in provided row key</li>
+ * </ul>
+ * Example: Let's assume row key format is userId_actionId_year_month. Length of userId is fixed and
+ * is 4, length of actionId is 2 and year and month are 4 and 2 bytes long respectively. Let's
+ * assume that we need to fetch all users that performed certain action (encoded as "99") in Jan of
+ * any year. Then the pair (row key, fuzzy info) would be the following: row key = "????_99_????_01"
+ * (one can use any value instead of "?") fuzzy info =
+ * "\x01\x01\x01\x01\x00\x00\x00\x00\x01\x01\x01\x01\x00\x00\x00" I.e. fuzzy info tells the matching
+ * mask is "????_99_????_01", where at ? can be any value.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class FuzzyRowFilter extends FilterBase {
+ private List<Pair<byte[], byte[]>> fuzzyKeysData;
+ private boolean done = false;
+
+ /**
+ * The index of a last successfully found matching fuzzy string (in fuzzyKeysData). We will start
+ * matching next KV with this one. If they do not match then we will return back to the one-by-one
+ * iteration over fuzzyKeysData.hbas
+ */
+ private int lastFoundIndex = -1;
+
+ /**
+ * Row tracker (keeps all next rows after SEEK_NEXT_USING_HINT was returned)
+ */
+ private RowTracker tracker;
+
+ public FuzzyRowFilter(List<Pair<byte[], byte[]>> fuzzyKeysData) {
+ Pair<byte[], byte[]> p;
+ for (int i = 0; i < fuzzyKeysData.size(); i++) {
+ p = fuzzyKeysData.get(i);
+ if (p.getFirst().length != p.getSecond().length) {
+ Pair<String, String> readable =
+ new Pair<String, String>(Bytes.toStringBinary(p.getFirst()), Bytes.toStringBinary(p
+ .getSecond()));
+ throw new IllegalArgumentException("Fuzzy pair lengths do not match: " + readable);
+ }
+ // update mask ( 0 -> -1 (0xff), 1 -> 0)
+ p.setSecond(preprocessMask(p.getSecond()));
+ preprocessSearchKey(p);
+ }
+ this.fuzzyKeysData = fuzzyKeysData;
+ this.tracker = new RowTracker();
+ }
+
+ private void preprocessSearchKey(Pair<byte[], byte[]> p) {
+// if (UnsafeAccess.unaligned() == false) {
+// return;
+// }
+ byte[] key = p.getFirst();
+ byte[] mask = p.getSecond();
+ for (int i = 0; i < mask.length; i++) {
+ // set non-fixed part of a search key to 0.
+ if (mask[i] == 0) key[i] = 0;
+ }
+ }
+
+ /**
+ * We need to preprocess mask array, as since we treat 0's as unfixed positions and -1 (0xff) as
+ * fixed positions
+ * @param mask
+ * @return mask array
+ */
+ private byte[] preprocessMask(byte[] mask) {
+// if (UnsafeAccess.unaligned() == false) {
+// return mask;
+// }
+ if (isPreprocessedMask(mask)) return mask;
+ for (int i = 0; i < mask.length; i++) {
+ if (mask[i] == 0) {
+ mask[i] = -1; // 0 -> -1
+ } else if (mask[i] == 1) {
+ mask[i] = 0;// 1 -> 0
+ }
+ }
+ return mask;
+ }
+
+ private boolean isPreprocessedMask(byte[] mask) {
+ for (int i = 0; i < mask.length; i++) {
+ if (mask[i] != -1 && mask[i] != 0) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public ReturnCode filterKeyValue(Cell c) {
+ final int startIndex = lastFoundIndex >= 0 ? lastFoundIndex : 0;
+ final int size = fuzzyKeysData.size();
+ for (int i = startIndex; i < size + startIndex; i++) {
+ final int index = i % size;
+ Pair<byte[], byte[]> fuzzyData = fuzzyKeysData.get(index);
+ SatisfiesCode satisfiesCode =
+ satisfies(isReversed(), c.getRowArray(), c.getRowOffset(), c.getRowLength(),
+ fuzzyData.getFirst(), fuzzyData.getSecond());
+ if (satisfiesCode == SatisfiesCode.YES) {
+ lastFoundIndex = index;
+ return ReturnCode.INCLUDE;
+ }
+ }
+ // NOT FOUND -> seek next using hint
+ lastFoundIndex = -1;
+
+ return ReturnCode.SEEK_NEXT_USING_HINT;
+
+ }
+
+ @Override
+ public Cell getNextCellHint(Cell currentCell) {
+ boolean result = tracker.updateTracker(currentCell);
+ if (result == false) {
+ done = true;
+ return null;
+ }
+ byte[] nextRowKey = tracker.nextRow();
+ return KeyValueUtil.createFirstOnRow(nextRowKey);
+ }
+
+ /**
+ * If we have multiple fuzzy keys, row tracker should improve overall performance. It calculates
+ * all next rows (one per every fuzzy key) and put them (the fuzzy key is bundled) into a priority
+ * queue so that the smallest row key always appears at queue head, which helps to decide the
+ * "Next Cell Hint". As scanning going on, the number of candidate rows in the RowTracker will
+ * remain the size of fuzzy keys until some of the fuzzy keys won't possibly have matches any
+ * more.
+ */
+ private class RowTracker {
+ private final PriorityQueue<Pair<byte[], Pair<byte[], byte[]>>> nextRows;
+ private boolean initialized = false;
+
+ RowTracker() {
+ nextRows =
+ new PriorityQueue<Pair<byte[], Pair<byte[], byte[]>>>(fuzzyKeysData.size(),
+ new Comparator<Pair<byte[], Pair<byte[], byte[]>>>() {
+ @Override
+ public int compare(Pair<byte[], Pair<byte[], byte[]>> o1,
+ Pair<byte[], Pair<byte[], byte[]>> o2) {
+ int compare = Bytes.compareTo(o1.getFirst(), o2.getFirst());
+ if (!isReversed()) {
+ return compare;
+ } else {
+ return -compare;
+ }
+ }
+ });
+ }
+
+ byte[] nextRow() {
+ if (nextRows.isEmpty()) {
+ throw new IllegalStateException(
+ "NextRows should not be empty, make sure to call nextRow() after updateTracker() return true");
+ } else {
+ return nextRows.peek().getFirst();
+ }
+ }
+
+ boolean updateTracker(Cell currentCell) {
+ if (!initialized) {
+ for (Pair<byte[], byte[]> fuzzyData : fuzzyKeysData) {
+ updateWith(currentCell, fuzzyData);
+ }
+ initialized = true;
+ } else {
+ while (!nextRows.isEmpty() && !lessThan(currentCell, nextRows.peek().getFirst())) {
+ Pair<byte[], Pair<byte[], byte[]>> head = nextRows.poll();
+ Pair<byte[], byte[]> fuzzyData = head.getSecond();
+ updateWith(currentCell, fuzzyData);
+ }
+ }
+ return !nextRows.isEmpty();
+ }
+
+ boolean lessThan(Cell currentCell, byte[] nextRowKey) {
+ int compareResult =
+ Bytes.compareTo(currentCell.getRowArray(), currentCell.getRowOffset(),
+ currentCell.getRowLength(), nextRowKey, 0, nextRowKey.length);
+ return (!isReversed() && compareResult < 0) || (isReversed() && compareResult > 0);
+ }
+
+ void updateWith(Cell currentCell, Pair<byte[], byte[]> fuzzyData) {
+ byte[] nextRowKeyCandidate =
+ getNextForFuzzyRule(isReversed(), currentCell.getRowArray(), currentCell.getRowOffset(),
+ currentCell.getRowLength(), fuzzyData.getFirst(), fuzzyData.getSecond());
+ if (nextRowKeyCandidate != null) {
+ nextRows.add(new Pair<byte[], Pair<byte[], byte[]>>(nextRowKeyCandidate, fuzzyData));
+ }
+ }
+
+ }
+
+ @Override
+ public boolean filterAllRemaining() {
+ return done;
+ }
+
+ /**
+ * @return The filter serialized using pb
+ */
+ public byte[] toByteArray() {
+ FilterProtos.FuzzyRowFilter.Builder builder = FilterProtos.FuzzyRowFilter.newBuilder();
+ for (Pair<byte[], byte[]> fuzzyData : fuzzyKeysData) {
+ BytesBytesPair.Builder bbpBuilder = BytesBytesPair.newBuilder();
+ bbpBuilder.setFirst(ByteStringer.wrap(fuzzyData.getFirst()));
+ bbpBuilder.setSecond(ByteStringer.wrap(fuzzyData.getSecond()));
+ builder.addFuzzyKeysData(bbpBuilder);
+ }
+ return builder.build().toByteArray();
+ }
+
+ /**
+ * @param pbBytes A pb serialized {@link FuzzyRowFilter} instance
+ * @return An instance of {@link FuzzyRowFilter} made from <code>bytes</code>
+ * @throws DeserializationException
+ * @see #toByteArray
+ */
+ public static FuzzyRowFilter parseFrom(final byte[] pbBytes) throws DeserializationException {
+ FilterProtos.FuzzyRowFilter proto;
+ try {
+ proto = FilterProtos.FuzzyRowFilter.parseFrom(pbBytes);
+ } catch (InvalidProtocolBufferException e) {
+ throw new DeserializationException(e);
+ }
+ int count = proto.getFuzzyKeysDataCount();
+ ArrayList<Pair<byte[], byte[]>> fuzzyKeysData = new ArrayList<Pair<byte[], byte[]>>(count);
+ for (int i = 0; i < count; ++i) {
+ BytesBytesPair current = proto.getFuzzyKeysData(i);
+ byte[] keyBytes = current.getFirst().toByteArray();
+ byte[] keyMeta = current.getSecond().toByteArray();
+ fuzzyKeysData.add(new Pair<byte[], byte[]>(keyBytes, keyMeta));
+ }
+ return new FuzzyRowFilter(fuzzyKeysData);
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder();
+ sb.append("FuzzyRowFilter");
+ sb.append("{fuzzyKeysData=");
+ for (Pair<byte[], byte[]> fuzzyData : fuzzyKeysData) {
+ sb.append('{').append(Bytes.toStringBinary(fuzzyData.getFirst())).append(":");
+ sb.append(Bytes.toStringBinary(fuzzyData.getSecond())).append('}');
+ }
+ sb.append("}, ");
+ return sb.toString();
+ }
+
+ // Utility methods
+
+ static enum SatisfiesCode {
+ /** row satisfies fuzzy rule */
+ YES,
+ /** row doesn't satisfy fuzzy rule, but there's possible greater row that does */
+ NEXT_EXISTS,
+ /** row doesn't satisfy fuzzy rule and there's no greater row that does */
+ NO_NEXT
+ }
+
+ @VisibleForTesting
+ static SatisfiesCode satisfies(byte[] row, byte[] fuzzyKeyBytes, byte[] fuzzyKeyMeta) {
+ return satisfies(false, row, 0, row.length, fuzzyKeyBytes, fuzzyKeyMeta);
+ }
+
+ @VisibleForTesting
+ static SatisfiesCode satisfies(boolean reverse, byte[] row, byte[] fuzzyKeyBytes,
+ byte[] fuzzyKeyMeta) {
+ return satisfies(reverse, row, 0, row.length, fuzzyKeyBytes, fuzzyKeyMeta);
+ }
+
+ static SatisfiesCode satisfies(boolean reverse, byte[] row, int offset, int length,
+ byte[] fuzzyKeyBytes, byte[] fuzzyKeyMeta) {
+
+// if (UnsafeAccess.unaligned() == false) {
+// return satisfiesNoUnsafe(reverse, row, offset, length, fuzzyKeyBytes, fuzzyKeyMeta);
+// }
+
+ if (row == null) {
+ // do nothing, let scan to proceed
+ return SatisfiesCode.YES;
+ }
+ length = Math.min(length, fuzzyKeyBytes.length);
+ int numWords = length / Bytes.SIZEOF_LONG;
+ int offsetAdj = offset + UnsafeAccess.BYTE_ARRAY_BASE_OFFSET;
+
+ int j = numWords << 3; // numWords * SIZEOF_LONG;
+
+ for (int i = 0; i < j; i += Bytes.SIZEOF_LONG) {
+
+ long fuzzyBytes =
+ UnsafeAccess.theUnsafe.getLong(fuzzyKeyBytes, UnsafeAccess.BYTE_ARRAY_BASE_OFFSET
+ + (long) i);
+ long fuzzyMeta =
+ UnsafeAccess.theUnsafe.getLong(fuzzyKeyMeta, UnsafeAccess.BYTE_ARRAY_BASE_OFFSET
+ + (long) i);
+ long rowValue = UnsafeAccess.theUnsafe.getLong(row, offsetAdj + (long) i);
+ if ((rowValue & fuzzyMeta) != (fuzzyBytes)) {
+ // We always return NEXT_EXISTS
+ return SatisfiesCode.NEXT_EXISTS;
+ }
+ }
+
+ int off = j;
+
+ if (length - off >= Bytes.SIZEOF_INT) {
+ int fuzzyBytes =
+ UnsafeAccess.theUnsafe.getInt(fuzzyKeyBytes, UnsafeAccess.BYTE_ARRAY_BASE_OFFSET
+ + (long) off);
+ int fuzzyMeta =
+ UnsafeAccess.theUnsafe.getInt(fuzzyKeyMeta, UnsafeAccess.BYTE_ARRAY_BASE_OFFSET
+ + (long) off);
+ int rowValue = UnsafeAccess.theUnsafe.getInt(row, offsetAdj + (long) off);
+ if ((rowValue & fuzzyMeta) != (fuzzyBytes)) {
+ // We always return NEXT_EXISTS
+ return SatisfiesCode.NEXT_EXISTS;
+ }
+ off += Bytes.SIZEOF_INT;
+ }
+
+ if (length - off >= Bytes.SIZEOF_SHORT) {
+ short fuzzyBytes =
+ UnsafeAccess.theUnsafe.getShort(fuzzyKeyBytes, UnsafeAccess.BYTE_ARRAY_BASE_OFFSET
+ + (long) off);
+ short fuzzyMeta =
+ UnsafeAccess.theUnsafe.getShort(fuzzyKeyMeta, UnsafeAccess.BYTE_ARRAY_BASE_OFFSET
+ + (long) off);
+ short rowValue = UnsafeAccess.theUnsafe.getShort(row, offsetAdj + (long) off);
+ if ((rowValue & fuzzyMeta) != (fuzzyBytes)) {
+ // We always return NEXT_EXISTS
+ // even if it does not (in this case getNextForFuzzyRule
+ // will return null)
+ return SatisfiesCode.NEXT_EXISTS;
+ }
+ off += Bytes.SIZEOF_SHORT;
+ }
+
+ if (length - off >= Bytes.SIZEOF_BYTE) {
+ int fuzzyBytes = fuzzyKeyBytes[off] & 0xff;
+ int fuzzyMeta = fuzzyKeyMeta[off] & 0xff;
+ int rowValue = row[offset + off] & 0xff;
+ if ((rowValue & fuzzyMeta) != (fuzzyBytes)) {
+ // We always return NEXT_EXISTS
+ return SatisfiesCode.NEXT_EXISTS;
+ }
+ }
+ return SatisfiesCode.YES;
+ }
+
+ static SatisfiesCode satisfiesNoUnsafe(boolean reverse, byte[] row, int offset, int length,
+ byte[] fuzzyKeyBytes, byte[] fuzzyKeyMeta) {
+ if (row == null) {
+ // do nothing, let scan to proceed
+ return SatisfiesCode.YES;
+ }
+
+ Order order = Order.orderFor(reverse);
+ boolean nextRowKeyCandidateExists = false;
+
+ for (int i = 0; i < fuzzyKeyMeta.length && i < length; i++) {
+ // First, checking if this position is fixed and not equals the given one
+ boolean byteAtPositionFixed = fuzzyKeyMeta[i] == 0;
+ boolean fixedByteIncorrect = byteAtPositionFixed && fuzzyKeyBytes[i] != row[i + offset];
+ if (fixedByteIncorrect) {
+ // in this case there's another row that satisfies fuzzy rule and bigger than this row
+ if (nextRowKeyCandidateExists) {
+ return SatisfiesCode.NEXT_EXISTS;
+ }
+
+ // If this row byte is less than fixed then there's a byte array bigger than
+ // this row and which satisfies the fuzzy rule. Otherwise there's no such byte array:
+ // this row is simply bigger than any byte array that satisfies the fuzzy rule
+ boolean rowByteLessThanFixed = (row[i + offset] & 0xFF) < (fuzzyKeyBytes[i] & 0xFF);
+ if (rowByteLessThanFixed && !reverse) {
+ return SatisfiesCode.NEXT_EXISTS;
+ } else if (!rowByteLessThanFixed && reverse) {
+ return SatisfiesCode.NEXT_EXISTS;
+ } else {
+ return SatisfiesCode.NO_NEXT;
+ }
+ }
+
+ // Second, checking if this position is not fixed and byte value is not the biggest. In this
+ // case there's a byte array bigger than this row and which satisfies the fuzzy rule. To get
+ // bigger byte array that satisfies the rule we need to just increase this byte
+ // (see the code of getNextForFuzzyRule below) by one.
+ // Note: if non-fixed byte is already at biggest value, this doesn't allow us to say there's
+ // bigger one that satisfies the rule as it can't be increased.
+ if (fuzzyKeyMeta[i] == 1 && !order.isMax(fuzzyKeyBytes[i])) {
+ nextRowKeyCandidateExists = true;
+ }
+ }
+ return SatisfiesCode.YES;
+ }
+
+ @VisibleForTesting
+ static byte[] getNextForFuzzyRule(byte[] row, byte[] fuzzyKeyBytes, byte[] fuzzyKeyMeta) {
+ return getNextForFuzzyRule(false, row, 0, row.length, fuzzyKeyBytes, fuzzyKeyMeta);
+ }
+
+ @VisibleForTesting
+ static byte[] getNextForFuzzyRule(boolean reverse, byte[] row, byte[] fuzzyKeyBytes,
+ byte[] fuzzyKeyMeta) {
+ return getNextForFuzzyRule(reverse, row, 0, row.length, fuzzyKeyBytes, fuzzyKeyMeta);
+ }
+
+ /** Abstracts directional comparisons based on scan direction. */
+ private enum Order {
+ ASC {
+ public boolean lt(int lhs, int rhs) {
+ return lhs < rhs;
+ }
+
+ public boolean gt(int lhs, int rhs) {
+ return lhs > rhs;
+ }
+
+ public byte inc(byte val) {
+ // TODO: what about over/underflow?
+ return (byte) (val + 1);
+ }
+
+ public boolean isMax(byte val) {
+ return val == (byte) 0xff;
+ }
+
+ public byte min() {
+ return 0;
+ }
+ },
+ DESC {
+ public boolean lt(int lhs, int rhs) {
+ return lhs > rhs;
+ }
+
+ public boolean gt(int lhs, int rhs) {
+ return lhs < rhs;
+ }
+
+ public byte inc(byte val) {
+ // TODO: what about over/underflow?
+ return (byte) (val - 1);
+ }
+
+ public boolean isMax(byte val) {
+ return val == 0;
+ }
+
+ public byte min() {
+ return (byte) 0xFF;
+ }
+ };
+
+ public static Order orderFor(boolean reverse) {
+ return reverse ? DESC : ASC;
+ }
+
+ /** Returns true when {@code lhs < rhs}. */
+ public abstract boolean lt(int lhs, int rhs);
+
+ /** Returns true when {@code lhs > rhs}. */
+ public abstract boolean gt(int lhs, int rhs);
+
+ /** Returns {@code val} incremented by 1. */
+ public abstract byte inc(byte val);
+
+ /** Return true when {@code val} is the maximum value */
+ public abstract boolean isMax(byte val);
+
+ /** Return the minimum value according to this ordering scheme. */
+ public abstract byte min();
+ }
+
+ /**
+ * @return greater byte array than given (row) which satisfies the fuzzy rule if it exists, null
+ * otherwise
+ */
+ @VisibleForTesting
+ static byte[] getNextForFuzzyRule(boolean reverse, byte[] row, int offset, int length,
+ byte[] fuzzyKeyBytes, byte[] fuzzyKeyMeta) {
+ // To find out the next "smallest" byte array that satisfies fuzzy rule and "greater" than
+ // the given one we do the following:
+ // 1. setting values on all "fixed" positions to the values from fuzzyKeyBytes
+ // 2. if during the first step given row did not increase, then we increase the value at
+ // the first "non-fixed" position (where it is not maximum already)
+
+ // It is easier to perform this by using fuzzyKeyBytes copy and setting "non-fixed" position
+ // values than otherwise.
+ byte[] result =
+ Arrays.copyOf(fuzzyKeyBytes, length > fuzzyKeyBytes.length ? length : fuzzyKeyBytes.length);
+ if (reverse && length > fuzzyKeyBytes.length) {
+ // we need trailing 0xff's instead of trailing 0x00's
+ for (int i = fuzzyKeyBytes.length; i < result.length; i++) {
+ result[i] = (byte) 0xFF;
+ }
+ }
+ int toInc = -1;
+ final Order order = Order.orderFor(reverse);
+
+ boolean increased = false;
+ for (int i = 0; i < result.length; i++) {
+ if (i >= fuzzyKeyMeta.length || fuzzyKeyMeta[i] == 0 /* non-fixed */) {
+ result[i] = row[offset + i];
+ if (!order.isMax(row[offset + i])) {
+ // this is "non-fixed" position and is not at max value, hence we can increase it
+ toInc = i;
+ }
+ } else if (i < fuzzyKeyMeta.length && fuzzyKeyMeta[i] == -1 /* fixed */) {
+ if (order.lt((row[i + offset] & 0xFF), (fuzzyKeyBytes[i] & 0xFF))) {
+ // if setting value for any fixed position increased the original array,
+ // we are OK
+ increased = true;
+ break;
+ }
+
+ if (order.gt((row[i + offset] & 0xFF), (fuzzyKeyBytes[i] & 0xFF))) {
+ // if setting value for any fixed position makes array "smaller", then just stop:
+ // in case we found some non-fixed position to increase we will do it, otherwise
+ // there's no "next" row key that satisfies fuzzy rule and "greater" than given row
+ break;
+ }
+ }
+ }
+
+ if (!increased) {
+ if (toInc < 0) {
+ return null;
+ }
+ result[toInc] = order.inc(result[toInc]);
+
+ // Setting all "non-fixed" positions to zeroes to the right of the one we increased so
+ // that found "next" row key is the smallest possible
+ for (int i = toInc + 1; i < result.length; i++) {
+ if (i >= fuzzyKeyMeta.length || fuzzyKeyMeta[i] == 0 /* non-fixed */) {
+ result[i] = order.min();
+ }
+ }
+ }
+
+ return reverse? result: trimTrailingZeroes(result, fuzzyKeyMeta, toInc);
+ }
+
+ /**
+ * For forward scanner, next cell hint should not contain any trailing zeroes
+ * unless they are part of fuzzyKeyMeta
+ * hint = '\x01\x01\x01\x00\x00'
+ * will skip valid row '\x01\x01\x01'
+ *
+ * @param result
+ * @param fuzzyKeyMeta
+ * @param toInc - position of incremented byte
+ * @return trimmed version of result
+ */
+
+ private static byte[] trimTrailingZeroes(byte[] result, byte[] fuzzyKeyMeta, int toInc) {
+ int off = fuzzyKeyMeta.length >= result.length? result.length -1:
+ fuzzyKeyMeta.length -1;
+ for( ; off >= 0; off--){
+ if(fuzzyKeyMeta[off] != 0) break;
+ }
+ if (off < toInc) off = toInc;
+ byte[] retValue = new byte[off+1];
+ System.arraycopy(result, 0, retValue, 0, retValue.length);
+ return retValue;
+ }
+
+ /**
+ * @return true if and only if the fields of the filter that are serialized are equal to the
+ * corresponding fields in other. Used for testing.
+ */
+ boolean areSerializedFieldsEqual(Filter o) {
+ if (o == this) return true;
+ if (!(o instanceof FuzzyRowFilter)) return false;
+
+ FuzzyRowFilter other = (FuzzyRowFilter) o;
+ if (this.fuzzyKeysData.size() != other.fuzzyKeysData.size()) return false;
+ for (int i = 0; i < fuzzyKeysData.size(); ++i) {
+ Pair<byte[], byte[]> thisData = this.fuzzyKeysData.get(i);
+ Pair<byte[], byte[]> otherData = other.fuzzyKeysData.get(i);
+ if (!(Bytes.equals(thisData.getFirst(), otherData.getFirst()) && Bytes.equals(
+ thisData.getSecond(), otherData.getSecond()))) {
+ return false;
+ }
+ }
+ return true;
+ }
+}