You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2017/03/07 10:06:59 UTC

kylin git commit: Rename FuzzyRowFilter to org/apache/kylin/storage/hbase/cube/v2/FuzzyRowFilter.java

Repository: kylin
Updated Branches:
  refs/heads/shaofeng-hbasefuzzyfilter 13fc3e648 -> bbdf433af


Rename FuzzyRowFilter to org/apache/kylin/storage/hbase/cube/v2/FuzzyRowFilter.java


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/bbdf433a
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/bbdf433a
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/bbdf433a

Branch: refs/heads/shaofeng-hbasefuzzyfilter
Commit: bbdf433af915105333a67d4881715a88758a66c3
Parents: 13fc3e6
Author: shaofengshi <sh...@apache.org>
Authored: Tue Mar 7 18:06:49 2017 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Mar 7 18:06:49 2017 +0800

----------------------------------------------------------------------
 pom.xml                                         |   2 +-
 .../hadoop/hbase/filter/FuzzyRowFilter.java     | 636 ------------------
 .../storage/hbase/cube/v2/CubeHBaseRPC.java     |   1 -
 .../storage/hbase/cube/v2/FuzzyRowFilter.java   | 638 +++++++++++++++++++
 4 files changed, 639 insertions(+), 638 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/bbdf433a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0ee5f03..6d3425e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,7 +54,7 @@
         <hive-hcatalog.version>1.2.1</hive-hcatalog.version>
 
         <!-- HBase versions -->
-        <hbase-hadoop2.version>1.1.2</hbase-hadoop2.version>
+        <hbase-hadoop2.version>1.1.1</hbase-hadoop2.version>
         <kafka.version>0.10.0.0</kafka.version>
 
         <!-- Hadoop deps, keep compatible with hadoop2.version -->

http://git-wip-us.apache.org/repos/asf/kylin/blob/bbdf433a/storage-hbase/src/main/java/org/apache/hadoop/hbase/filter/FuzzyRowFilter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/hadoop/hbase/filter/FuzzyRowFilter.java b/storage-hbase/src/main/java/org/apache/hadoop/hbase/filter/FuzzyRowFilter.java
deleted file mode 100644
index e34ebc4..0000000
--- a/storage-hbase/src/main/java/org/apache/hadoop/hbase/filter/FuzzyRowFilter.java
+++ /dev/null
@@ -1,636 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.hadoop.hbase.filter;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.List;
-import java.util.PriorityQueue;
-
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.protobuf.generated.FilterProtos;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair;
-import org.apache.hadoop.hbase.util.ByteStringer;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.UnsafeAccess;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.InvalidProtocolBufferException;
-
-/**
- * This is optimized version of a standard FuzzyRowFilter Filters data based on fuzzy row key.
- * Performs fast-forwards during scanning. It takes pairs (row key, fuzzy info) to match row keys.
- * Where fuzzy info is a byte array with 0 or 1 as its values:
- * <ul>
- * <li>0 - means that this byte in provided row key is fixed, i.e. row key's byte at same position
- * must match</li>
- * <li>1 - means that this byte in provided row key is NOT fixed, i.e. row key's byte at this
- * position can be different from the one in provided row key</li>
- * </ul>
- * Example: Let's assume row key format is userId_actionId_year_month. Length of userId is fixed and
- * is 4, length of actionId is 2 and year and month are 4 and 2 bytes long respectively. Let's
- * assume that we need to fetch all users that performed certain action (encoded as "99") in Jan of
- * any year. Then the pair (row key, fuzzy info) would be the following: row key = "????_99_????_01"
- * (one can use any value instead of "?") fuzzy info =
- * "\x01\x01\x01\x01\x00\x00\x00\x00\x01\x01\x01\x01\x00\x00\x00" I.e. fuzzy info tells the matching
- * mask is "????_99_????_01", where at ? can be any value.
- */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public class FuzzyRowFilter extends FilterBase {
-  private List<Pair<byte[], byte[]>> fuzzyKeysData;
-  private boolean done = false;
-
-  /**
-   * The index of a last successfully found matching fuzzy string (in fuzzyKeysData). We will start
-   * matching next KV with this one. If they do not match then we will return back to the one-by-one
-   * iteration over fuzzyKeysData.hbas
-   */
-  private int lastFoundIndex = -1;
-
-  /**
-   * Row tracker (keeps all next rows after SEEK_NEXT_USING_HINT was returned)
-   */
-  private RowTracker tracker;
-
-  public FuzzyRowFilter(List<Pair<byte[], byte[]>> fuzzyKeysData) {
-    Pair<byte[], byte[]> p;
-    for (int i = 0; i < fuzzyKeysData.size(); i++) {
-      p = fuzzyKeysData.get(i);
-      if (p.getFirst().length != p.getSecond().length) {
-        Pair<String, String> readable =
-            new Pair<String, String>(Bytes.toStringBinary(p.getFirst()), Bytes.toStringBinary(p
-                .getSecond()));
-        throw new IllegalArgumentException("Fuzzy pair lengths do not match: " + readable);
-      }
-      // update mask ( 0 -> -1 (0xff), 1 -> 0)
-      p.setSecond(preprocessMask(p.getSecond()));
-      preprocessSearchKey(p);
-    }
-    this.fuzzyKeysData = fuzzyKeysData;
-    this.tracker = new RowTracker();
-  }
-
-  private void preprocessSearchKey(Pair<byte[], byte[]> p) {
-//    if (UnsafeAccess.unaligned() == false) {
-//      return;
-//    }
-    byte[] key = p.getFirst();
-    byte[] mask = p.getSecond();
-    for (int i = 0; i < mask.length; i++) {
-      // set non-fixed part of a search key to 0.
-      if (mask[i] == 0) key[i] = 0;
-    }
-  }
-
-  /**
-   * We need to preprocess mask array, as since we treat 0's as unfixed positions and -1 (0xff) as
-   * fixed positions
-   * @param mask
-   * @return mask array
-   */
-  private byte[] preprocessMask(byte[] mask) {
-//    if (UnsafeAccess.unaligned() == false) {
-//      return mask;
-//    }
-    if (isPreprocessedMask(mask)) return mask;
-    for (int i = 0; i < mask.length; i++) {
-      if (mask[i] == 0) {
-        mask[i] = -1; // 0 -> -1
-      } else if (mask[i] == 1) {
-        mask[i] = 0;// 1 -> 0
-      }
-    }
-    return mask;
-  }
-
-  private boolean isPreprocessedMask(byte[] mask) {
-    for (int i = 0; i < mask.length; i++) {
-      if (mask[i] != -1 && mask[i] != 0) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  @Override
-  public ReturnCode filterKeyValue(Cell c) {
-    final int startIndex = lastFoundIndex >= 0 ? lastFoundIndex : 0;
-    final int size = fuzzyKeysData.size();
-    for (int i = startIndex; i < size + startIndex; i++) {
-      final int index = i % size;
-      Pair<byte[], byte[]> fuzzyData = fuzzyKeysData.get(index);
-      SatisfiesCode satisfiesCode =
-          satisfies(isReversed(), c.getRowArray(), c.getRowOffset(), c.getRowLength(),
-            fuzzyData.getFirst(), fuzzyData.getSecond());
-      if (satisfiesCode == SatisfiesCode.YES) {
-        lastFoundIndex = index;
-        return ReturnCode.INCLUDE;
-      }
-    }
-    // NOT FOUND -> seek next using hint
-    lastFoundIndex = -1;
-
-    return ReturnCode.SEEK_NEXT_USING_HINT;
-
-  }
-
-  @Override
-  public Cell getNextCellHint(Cell currentCell) {
-    boolean result = tracker.updateTracker(currentCell);
-    if (result == false) {
-      done = true;
-      return null;
-    }
-    byte[] nextRowKey = tracker.nextRow();
-    return KeyValueUtil.createFirstOnRow(nextRowKey);
-  }
-
-  /**
-   * If we have multiple fuzzy keys, row tracker should improve overall performance. It calculates
-   * all next rows (one per every fuzzy key) and put them (the fuzzy key is bundled) into a priority
-   * queue so that the smallest row key always appears at queue head, which helps to decide the
-   * "Next Cell Hint". As scanning going on, the number of candidate rows in the RowTracker will
-   * remain the size of fuzzy keys until some of the fuzzy keys won't possibly have matches any
-   * more.
-   */
-  private class RowTracker {
-    private final PriorityQueue<Pair<byte[], Pair<byte[], byte[]>>> nextRows;
-    private boolean initialized = false;
-
-    RowTracker() {
-      nextRows =
-          new PriorityQueue<Pair<byte[], Pair<byte[], byte[]>>>(fuzzyKeysData.size(),
-              new Comparator<Pair<byte[], Pair<byte[], byte[]>>>() {
-                @Override
-                public int compare(Pair<byte[], Pair<byte[], byte[]>> o1,
-                    Pair<byte[], Pair<byte[], byte[]>> o2) {
-                  int compare = Bytes.compareTo(o1.getFirst(), o2.getFirst());
-                  if (!isReversed()) {
-                    return compare;
-                  } else {
-                    return -compare;
-                  }
-                }
-              });
-    }
-
-    byte[] nextRow() {
-      if (nextRows.isEmpty()) {
-        throw new IllegalStateException(
-            "NextRows should not be empty, make sure to call nextRow() after updateTracker() return true");
-      } else {
-        return nextRows.peek().getFirst();
-      }
-    }
-
-    boolean updateTracker(Cell currentCell) {
-      if (!initialized) {
-        for (Pair<byte[], byte[]> fuzzyData : fuzzyKeysData) {
-          updateWith(currentCell, fuzzyData);
-        }
-        initialized = true;
-      } else {
-        while (!nextRows.isEmpty() && !lessThan(currentCell, nextRows.peek().getFirst())) {
-          Pair<byte[], Pair<byte[], byte[]>> head = nextRows.poll();
-          Pair<byte[], byte[]> fuzzyData = head.getSecond();
-          updateWith(currentCell, fuzzyData);
-        }
-      }
-      return !nextRows.isEmpty();
-    }
-
-    boolean lessThan(Cell currentCell, byte[] nextRowKey) {
-      int compareResult =
-          Bytes.compareTo(currentCell.getRowArray(), currentCell.getRowOffset(),
-            currentCell.getRowLength(), nextRowKey, 0, nextRowKey.length);
-      return (!isReversed() && compareResult < 0) || (isReversed() && compareResult > 0);
-    }
-
-    void updateWith(Cell currentCell, Pair<byte[], byte[]> fuzzyData) {
-      byte[] nextRowKeyCandidate =
-          getNextForFuzzyRule(isReversed(), currentCell.getRowArray(), currentCell.getRowOffset(),
-            currentCell.getRowLength(), fuzzyData.getFirst(), fuzzyData.getSecond());
-      if (nextRowKeyCandidate != null) {
-        nextRows.add(new Pair<byte[], Pair<byte[], byte[]>>(nextRowKeyCandidate, fuzzyData));
-      }
-    }
-
-  }
-
-  @Override
-  public boolean filterAllRemaining() {
-    return done;
-  }
-
-  /**
-   * @return The filter serialized using pb
-   */
-  public byte[] toByteArray() {
-    FilterProtos.FuzzyRowFilter.Builder builder = FilterProtos.FuzzyRowFilter.newBuilder();
-    for (Pair<byte[], byte[]> fuzzyData : fuzzyKeysData) {
-      BytesBytesPair.Builder bbpBuilder = BytesBytesPair.newBuilder();
-      bbpBuilder.setFirst(ByteStringer.wrap(fuzzyData.getFirst()));
-      bbpBuilder.setSecond(ByteStringer.wrap(fuzzyData.getSecond()));
-      builder.addFuzzyKeysData(bbpBuilder);
-    }
-    return builder.build().toByteArray();
-  }
-
-  /**
-   * @param pbBytes A pb serialized {@link FuzzyRowFilter} instance
-   * @return An instance of {@link FuzzyRowFilter} made from <code>bytes</code>
-   * @throws DeserializationException
-   * @see #toByteArray
-   */
-  public static FuzzyRowFilter parseFrom(final byte[] pbBytes) throws DeserializationException {
-    FilterProtos.FuzzyRowFilter proto;
-    try {
-      proto = FilterProtos.FuzzyRowFilter.parseFrom(pbBytes);
-    } catch (InvalidProtocolBufferException e) {
-      throw new DeserializationException(e);
-    }
-    int count = proto.getFuzzyKeysDataCount();
-    ArrayList<Pair<byte[], byte[]>> fuzzyKeysData = new ArrayList<Pair<byte[], byte[]>>(count);
-    for (int i = 0; i < count; ++i) {
-      BytesBytesPair current = proto.getFuzzyKeysData(i);
-      byte[] keyBytes = current.getFirst().toByteArray();
-      byte[] keyMeta = current.getSecond().toByteArray();
-      fuzzyKeysData.add(new Pair<byte[], byte[]>(keyBytes, keyMeta));
-    }
-    return new FuzzyRowFilter(fuzzyKeysData);
-  }
-
-  @Override
-  public String toString() {
-    final StringBuilder sb = new StringBuilder();
-    sb.append("FuzzyRowFilter");
-    sb.append("{fuzzyKeysData=");
-    for (Pair<byte[], byte[]> fuzzyData : fuzzyKeysData) {
-      sb.append('{').append(Bytes.toStringBinary(fuzzyData.getFirst())).append(":");
-      sb.append(Bytes.toStringBinary(fuzzyData.getSecond())).append('}');
-    }
-    sb.append("}, ");
-    return sb.toString();
-  }
-
-  // Utility methods
-
-  static enum SatisfiesCode {
-    /** row satisfies fuzzy rule */
-    YES,
-    /** row doesn't satisfy fuzzy rule, but there's possible greater row that does */
-    NEXT_EXISTS,
-    /** row doesn't satisfy fuzzy rule and there's no greater row that does */
-    NO_NEXT
-  }
-
-  @VisibleForTesting
-  static SatisfiesCode satisfies(byte[] row, byte[] fuzzyKeyBytes, byte[] fuzzyKeyMeta) {
-    return satisfies(false, row, 0, row.length, fuzzyKeyBytes, fuzzyKeyMeta);
-  }
-
-  @VisibleForTesting
-  static SatisfiesCode satisfies(boolean reverse, byte[] row, byte[] fuzzyKeyBytes,
-      byte[] fuzzyKeyMeta) {
-    return satisfies(reverse, row, 0, row.length, fuzzyKeyBytes, fuzzyKeyMeta);
-  }
-
-  static SatisfiesCode satisfies(boolean reverse, byte[] row, int offset, int length,
-      byte[] fuzzyKeyBytes, byte[] fuzzyKeyMeta) {
-
-//    if (UnsafeAccess.unaligned() == false) {
-//      return satisfiesNoUnsafe(reverse, row, offset, length, fuzzyKeyBytes, fuzzyKeyMeta);
-//    }
-
-    if (row == null) {
-      // do nothing, let scan to proceed
-      return SatisfiesCode.YES;
-    }
-    length = Math.min(length, fuzzyKeyBytes.length);
-    int numWords = length / Bytes.SIZEOF_LONG;
-    int offsetAdj = offset + UnsafeAccess.BYTE_ARRAY_BASE_OFFSET;
-
-    int j = numWords << 3; // numWords * SIZEOF_LONG;
-
-    for (int i = 0; i < j; i += Bytes.SIZEOF_LONG) {
-
-      long fuzzyBytes =
-          UnsafeAccess.theUnsafe.getLong(fuzzyKeyBytes, UnsafeAccess.BYTE_ARRAY_BASE_OFFSET
-              + (long) i);
-      long fuzzyMeta =
-          UnsafeAccess.theUnsafe.getLong(fuzzyKeyMeta, UnsafeAccess.BYTE_ARRAY_BASE_OFFSET
-              + (long) i);
-      long rowValue = UnsafeAccess.theUnsafe.getLong(row, offsetAdj + (long) i);
-      if ((rowValue & fuzzyMeta) != (fuzzyBytes)) {
-        // We always return NEXT_EXISTS
-        return SatisfiesCode.NEXT_EXISTS;
-      }
-    }
-
-    int off = j;
-
-    if (length - off >= Bytes.SIZEOF_INT) {
-      int fuzzyBytes =
-          UnsafeAccess.theUnsafe.getInt(fuzzyKeyBytes, UnsafeAccess.BYTE_ARRAY_BASE_OFFSET
-              + (long) off);
-      int fuzzyMeta =
-          UnsafeAccess.theUnsafe.getInt(fuzzyKeyMeta, UnsafeAccess.BYTE_ARRAY_BASE_OFFSET
-              + (long) off);
-      int rowValue = UnsafeAccess.theUnsafe.getInt(row, offsetAdj + (long) off);
-      if ((rowValue & fuzzyMeta) != (fuzzyBytes)) {
-        // We always return NEXT_EXISTS
-        return SatisfiesCode.NEXT_EXISTS;
-      }
-      off += Bytes.SIZEOF_INT;
-    }
-
-    if (length - off >= Bytes.SIZEOF_SHORT) {
-      short fuzzyBytes =
-          UnsafeAccess.theUnsafe.getShort(fuzzyKeyBytes, UnsafeAccess.BYTE_ARRAY_BASE_OFFSET
-              + (long) off);
-      short fuzzyMeta =
-          UnsafeAccess.theUnsafe.getShort(fuzzyKeyMeta, UnsafeAccess.BYTE_ARRAY_BASE_OFFSET
-              + (long) off);
-      short rowValue = UnsafeAccess.theUnsafe.getShort(row, offsetAdj + (long) off);
-      if ((rowValue & fuzzyMeta) != (fuzzyBytes)) {
-        // We always return NEXT_EXISTS
-        // even if it does not (in this case getNextForFuzzyRule
-        // will return null)
-        return SatisfiesCode.NEXT_EXISTS;
-      }
-      off += Bytes.SIZEOF_SHORT;
-    }
-
-    if (length - off >= Bytes.SIZEOF_BYTE) {
-      int fuzzyBytes = fuzzyKeyBytes[off] & 0xff;
-      int fuzzyMeta = fuzzyKeyMeta[off] & 0xff;
-      int rowValue = row[offset + off] & 0xff;
-      if ((rowValue & fuzzyMeta) != (fuzzyBytes)) {
-        // We always return NEXT_EXISTS
-        return SatisfiesCode.NEXT_EXISTS;
-      }
-    }
-    return SatisfiesCode.YES;
-  }
-
-  static SatisfiesCode satisfiesNoUnsafe(boolean reverse, byte[] row, int offset, int length,
-      byte[] fuzzyKeyBytes, byte[] fuzzyKeyMeta) {
-    if (row == null) {
-      // do nothing, let scan to proceed
-      return SatisfiesCode.YES;
-    }
-
-    Order order = Order.orderFor(reverse);
-    boolean nextRowKeyCandidateExists = false;
-
-    for (int i = 0; i < fuzzyKeyMeta.length && i < length; i++) {
-      // First, checking if this position is fixed and not equals the given one
-      boolean byteAtPositionFixed = fuzzyKeyMeta[i] == 0;
-      boolean fixedByteIncorrect = byteAtPositionFixed && fuzzyKeyBytes[i] != row[i + offset];
-      if (fixedByteIncorrect) {
-        // in this case there's another row that satisfies fuzzy rule and bigger than this row
-        if (nextRowKeyCandidateExists) {
-          return SatisfiesCode.NEXT_EXISTS;
-        }
-
-        // If this row byte is less than fixed then there's a byte array bigger than
-        // this row and which satisfies the fuzzy rule. Otherwise there's no such byte array:
-        // this row is simply bigger than any byte array that satisfies the fuzzy rule
-        boolean rowByteLessThanFixed = (row[i + offset] & 0xFF) < (fuzzyKeyBytes[i] & 0xFF);
-        if (rowByteLessThanFixed && !reverse) {
-          return SatisfiesCode.NEXT_EXISTS;
-        } else if (!rowByteLessThanFixed && reverse) {
-          return SatisfiesCode.NEXT_EXISTS;
-        } else {
-          return SatisfiesCode.NO_NEXT;
-        }
-      }
-
-      // Second, checking if this position is not fixed and byte value is not the biggest. In this
-      // case there's a byte array bigger than this row and which satisfies the fuzzy rule. To get
-      // bigger byte array that satisfies the rule we need to just increase this byte
-      // (see the code of getNextForFuzzyRule below) by one.
-      // Note: if non-fixed byte is already at biggest value, this doesn't allow us to say there's
-      // bigger one that satisfies the rule as it can't be increased.
-      if (fuzzyKeyMeta[i] == 1 && !order.isMax(fuzzyKeyBytes[i])) {
-        nextRowKeyCandidateExists = true;
-      }
-    }
-    return SatisfiesCode.YES;
-  }
-
-  @VisibleForTesting
-  static byte[] getNextForFuzzyRule(byte[] row, byte[] fuzzyKeyBytes, byte[] fuzzyKeyMeta) {
-    return getNextForFuzzyRule(false, row, 0, row.length, fuzzyKeyBytes, fuzzyKeyMeta);
-  }
-
-  @VisibleForTesting
-  static byte[] getNextForFuzzyRule(boolean reverse, byte[] row, byte[] fuzzyKeyBytes,
-      byte[] fuzzyKeyMeta) {
-    return getNextForFuzzyRule(reverse, row, 0, row.length, fuzzyKeyBytes, fuzzyKeyMeta);
-  }
-
-  /** Abstracts directional comparisons based on scan direction. */
-  private enum Order {
-    ASC {
-      public boolean lt(int lhs, int rhs) {
-        return lhs < rhs;
-      }
-
-      public boolean gt(int lhs, int rhs) {
-        return lhs > rhs;
-      }
-
-      public byte inc(byte val) {
-        // TODO: what about over/underflow?
-        return (byte) (val + 1);
-      }
-
-      public boolean isMax(byte val) {
-        return val == (byte) 0xff;
-      }
-
-      public byte min() {
-        return 0;
-      }
-    },
-    DESC {
-      public boolean lt(int lhs, int rhs) {
-        return lhs > rhs;
-      }
-
-      public boolean gt(int lhs, int rhs) {
-        return lhs < rhs;
-      }
-
-      public byte inc(byte val) {
-        // TODO: what about over/underflow?
-        return (byte) (val - 1);
-      }
-
-      public boolean isMax(byte val) {
-        return val == 0;
-      }
-
-      public byte min() {
-        return (byte) 0xFF;
-      }
-    };
-
-    public static Order orderFor(boolean reverse) {
-      return reverse ? DESC : ASC;
-    }
-
-    /** Returns true when {@code lhs < rhs}. */
-    public abstract boolean lt(int lhs, int rhs);
-
-    /** Returns true when {@code lhs > rhs}. */
-    public abstract boolean gt(int lhs, int rhs);
-
-    /** Returns {@code val} incremented by 1. */
-    public abstract byte inc(byte val);
-
-    /** Return true when {@code val} is the maximum value */
-    public abstract boolean isMax(byte val);
-
-    /** Return the minimum value according to this ordering scheme. */
-    public abstract byte min();
-  }
-
-  /**
-   * @return greater byte array than given (row) which satisfies the fuzzy rule if it exists, null
-   *         otherwise
-   */
-  @VisibleForTesting
-  static byte[] getNextForFuzzyRule(boolean reverse, byte[] row, int offset, int length,
-      byte[] fuzzyKeyBytes, byte[] fuzzyKeyMeta) {
-    // To find out the next "smallest" byte array that satisfies fuzzy rule and "greater" than
-    // the given one we do the following:
-    // 1. setting values on all "fixed" positions to the values from fuzzyKeyBytes
-    // 2. if during the first step given row did not increase, then we increase the value at
-    // the first "non-fixed" position (where it is not maximum already)
-
-    // It is easier to perform this by using fuzzyKeyBytes copy and setting "non-fixed" position
-    // values than otherwise.
-    byte[] result =
-        Arrays.copyOf(fuzzyKeyBytes, length > fuzzyKeyBytes.length ? length : fuzzyKeyBytes.length);
-    if (reverse && length > fuzzyKeyBytes.length) {
-      // we need trailing 0xff's instead of trailing 0x00's
-      for (int i = fuzzyKeyBytes.length; i < result.length; i++) {
-        result[i] = (byte) 0xFF;
-      }
-    }
-    int toInc = -1;
-    final Order order = Order.orderFor(reverse);
-
-    boolean increased = false;
-    for (int i = 0; i < result.length; i++) {
-      if (i >= fuzzyKeyMeta.length || fuzzyKeyMeta[i] == 0 /* non-fixed */) {
-        result[i] = row[offset + i];
-        if (!order.isMax(row[offset + i])) {
-          // this is "non-fixed" position and is not at max value, hence we can increase it
-          toInc = i;
-        }
-      } else if (i < fuzzyKeyMeta.length && fuzzyKeyMeta[i] == -1 /* fixed */) {
-        if (order.lt((row[i + offset] & 0xFF), (fuzzyKeyBytes[i] & 0xFF))) {
-          // if setting value for any fixed position increased the original array,
-          // we are OK
-          increased = true;
-          break;
-        }
-
-        if (order.gt((row[i + offset] & 0xFF), (fuzzyKeyBytes[i] & 0xFF))) {
-          // if setting value for any fixed position makes array "smaller", then just stop:
-          // in case we found some non-fixed position to increase we will do it, otherwise
-          // there's no "next" row key that satisfies fuzzy rule and "greater" than given row
-          break;
-        }
-      }
-    }
-
-    if (!increased) {
-      if (toInc < 0) {
-        return null;
-      }
-      result[toInc] = order.inc(result[toInc]);
-
-      // Setting all "non-fixed" positions to zeroes to the right of the one we increased so
-      // that found "next" row key is the smallest possible
-      for (int i = toInc + 1; i < result.length; i++) {
-        if (i >= fuzzyKeyMeta.length || fuzzyKeyMeta[i] == 0 /* non-fixed */) {
-          result[i] = order.min();
-        }
-      }
-    }
-
-    return reverse? result: trimTrailingZeroes(result, fuzzyKeyMeta, toInc);
-  }
-
-  /**
-   * For forward scanner, next cell hint should  not contain any trailing zeroes
-   * unless they are part of fuzzyKeyMeta
-   * hint = '\x01\x01\x01\x00\x00'
-   * will skip valid row '\x01\x01\x01'
-   * 
-   * @param result
-   * @param fuzzyKeyMeta
-   * @param toInc - position of incremented byte
-   * @return trimmed version of result
-   */
-  
-  private static byte[] trimTrailingZeroes(byte[] result, byte[] fuzzyKeyMeta, int toInc) {
-    int off = fuzzyKeyMeta.length >= result.length? result.length -1:
-           fuzzyKeyMeta.length -1;  
-    for( ; off >= 0; off--){
-      if(fuzzyKeyMeta[off] != 0) break;
-    }
-    if (off < toInc)  off = toInc;
-    byte[] retValue = new byte[off+1];
-    System.arraycopy(result, 0, retValue, 0, retValue.length);
-    return retValue;
-  }
-
-  /**
-   * @return true if and only if the fields of the filter that are serialized are equal to the
-   *         corresponding fields in other. Used for testing.
-   */
-  boolean areSerializedFieldsEqual(Filter o) {
-    if (o == this) return true;
-    if (!(o instanceof FuzzyRowFilter)) return false;
-
-    FuzzyRowFilter other = (FuzzyRowFilter) o;
-    if (this.fuzzyKeysData.size() != other.fuzzyKeysData.size()) return false;
-    for (int i = 0; i < fuzzyKeysData.size(); ++i) {
-      Pair<byte[], byte[]> thisData = this.fuzzyKeysData.get(i);
-      Pair<byte[], byte[]> otherData = other.fuzzyKeysData.get(i);
-      if (!(Bytes.equals(thisData.getFirst(), otherData.getFirst()) && Bytes.equals(
-        thisData.getSecond(), otherData.getSecond()))) {
-        return false;
-      }
-    }
-    return true;
-  }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/bbdf433a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
index 05b34c7..0aee202 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterList;
-import org.apache.hadoop.hbase.filter.FuzzyRowFilter;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.common.util.Bytes;

http://git-wip-us.apache.org/repos/asf/kylin/blob/bbdf433a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/FuzzyRowFilter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/FuzzyRowFilter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/FuzzyRowFilter.java
new file mode 100644
index 0000000..2c9579e
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/FuzzyRowFilter.java
@@ -0,0 +1,638 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.cube.v2;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.protobuf.generated.FilterProtos;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair;
+import org.apache.hadoop.hbase.util.ByteStringer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.UnsafeAccess;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * This is optimized version of a standard FuzzyRowFilter Filters data based on fuzzy row key.
+ * Performs fast-forwards during scanning. It takes pairs (row key, fuzzy info) to match row keys.
+ * Where fuzzy info is a byte array with 0 or 1 as its values:
+ * <ul>
+ * <li>0 - means that this byte in provided row key is fixed, i.e. row key's byte at same position
+ * must match</li>
+ * <li>1 - means that this byte in provided row key is NOT fixed, i.e. row key's byte at this
+ * position can be different from the one in provided row key</li>
+ * </ul>
+ * Example: Let's assume row key format is userId_actionId_year_month. Length of userId is fixed and
+ * is 4, length of actionId is 2 and year and month are 4 and 2 bytes long respectively. Let's
+ * assume that we need to fetch all users that performed certain action (encoded as "99") in Jan of
+ * any year. Then the pair (row key, fuzzy info) would be the following: row key = "????_99_????_01"
+ * (one can use any value instead of "?") fuzzy info =
+ * "\x01\x01\x01\x01\x00\x00\x00\x00\x01\x01\x01\x01\x00\x00\x00" I.e. fuzzy info tells the matching
+ * mask is "????_99_????_01", where at ? can be any value.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class FuzzyRowFilter extends FilterBase {
+  private List<Pair<byte[], byte[]>> fuzzyKeysData;
+  private boolean done = false;
+
+  /**
+   * The index of a last successfully found matching fuzzy string (in fuzzyKeysData). We will start
+   * matching next KV with this one. If they do not match then we will return back to the one-by-one
+   * iteration over fuzzyKeysData.hbas
+   */
+  private int lastFoundIndex = -1;
+
+  /**
+   * Row tracker (keeps all next rows after SEEK_NEXT_USING_HINT was returned)
+   */
+  private RowTracker tracker;
+
+  public FuzzyRowFilter(List<Pair<byte[], byte[]>> fuzzyKeysData) {
+    Pair<byte[], byte[]> p;
+    for (int i = 0; i < fuzzyKeysData.size(); i++) {
+      p = fuzzyKeysData.get(i);
+      if (p.getFirst().length != p.getSecond().length) {
+        Pair<String, String> readable =
+            new Pair<String, String>(Bytes.toStringBinary(p.getFirst()), Bytes.toStringBinary(p
+                .getSecond()));
+        throw new IllegalArgumentException("Fuzzy pair lengths do not match: " + readable);
+      }
+      // update mask ( 0 -> -1 (0xff), 1 -> 0)
+      p.setSecond(preprocessMask(p.getSecond()));
+      preprocessSearchKey(p);
+    }
+    this.fuzzyKeysData = fuzzyKeysData;
+    this.tracker = new RowTracker();
+  }
+
+  private void preprocessSearchKey(Pair<byte[], byte[]> p) {
+//    if (UnsafeAccess.unaligned() == false) {
+//      return;
+//    }
+    byte[] key = p.getFirst();
+    byte[] mask = p.getSecond();
+    for (int i = 0; i < mask.length; i++) {
+      // set non-fixed part of a search key to 0.
+      if (mask[i] == 0) key[i] = 0;
+    }
+  }
+
+  /**
+   * We need to preprocess mask array, as since we treat 0's as unfixed positions and -1 (0xff) as
+   * fixed positions
+   * @param mask
+   * @return mask array
+   */
+  private byte[] preprocessMask(byte[] mask) {
+//    if (UnsafeAccess.unaligned() == false) {
+//      return mask;
+//    }
+    if (isPreprocessedMask(mask)) return mask;
+    for (int i = 0; i < mask.length; i++) {
+      if (mask[i] == 0) {
+        mask[i] = -1; // 0 -> -1
+      } else if (mask[i] == 1) {
+        mask[i] = 0;// 1 -> 0
+      }
+    }
+    return mask;
+  }
+
+  private boolean isPreprocessedMask(byte[] mask) {
+    for (int i = 0; i < mask.length; i++) {
+      if (mask[i] != -1 && mask[i] != 0) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public ReturnCode filterKeyValue(Cell c) {
+    final int startIndex = lastFoundIndex >= 0 ? lastFoundIndex : 0;
+    final int size = fuzzyKeysData.size();
+    for (int i = startIndex; i < size + startIndex; i++) {
+      final int index = i % size;
+      Pair<byte[], byte[]> fuzzyData = fuzzyKeysData.get(index);
+      SatisfiesCode satisfiesCode =
+          satisfies(isReversed(), c.getRowArray(), c.getRowOffset(), c.getRowLength(),
+            fuzzyData.getFirst(), fuzzyData.getSecond());
+      if (satisfiesCode == SatisfiesCode.YES) {
+        lastFoundIndex = index;
+        return ReturnCode.INCLUDE;
+      }
+    }
+    // NOT FOUND -> seek next using hint
+    lastFoundIndex = -1;
+
+    return ReturnCode.SEEK_NEXT_USING_HINT;
+
+  }
+
+  @Override
+  public Cell getNextCellHint(Cell currentCell) {
+    boolean result = tracker.updateTracker(currentCell);
+    if (result == false) {
+      done = true;
+      return null;
+    }
+    byte[] nextRowKey = tracker.nextRow();
+    return KeyValueUtil.createFirstOnRow(nextRowKey);
+  }
+
+  /**
+   * If we have multiple fuzzy keys, row tracker should improve overall performance. It calculates
+   * all next rows (one per every fuzzy key) and put them (the fuzzy key is bundled) into a priority
+   * queue so that the smallest row key always appears at queue head, which helps to decide the
+   * "Next Cell Hint". As scanning going on, the number of candidate rows in the RowTracker will
+   * remain the size of fuzzy keys until some of the fuzzy keys won't possibly have matches any
+   * more.
+   */
+  private class RowTracker {
+    private final PriorityQueue<Pair<byte[], Pair<byte[], byte[]>>> nextRows;
+    private boolean initialized = false;
+
+    RowTracker() {
+      nextRows =
+          new PriorityQueue<Pair<byte[], Pair<byte[], byte[]>>>(fuzzyKeysData.size(),
+              new Comparator<Pair<byte[], Pair<byte[], byte[]>>>() {
+                @Override
+                public int compare(Pair<byte[], Pair<byte[], byte[]>> o1,
+                    Pair<byte[], Pair<byte[], byte[]>> o2) {
+                  int compare = Bytes.compareTo(o1.getFirst(), o2.getFirst());
+                  if (!isReversed()) {
+                    return compare;
+                  } else {
+                    return -compare;
+                  }
+                }
+              });
+    }
+
+    byte[] nextRow() {
+      if (nextRows.isEmpty()) {
+        throw new IllegalStateException(
+            "NextRows should not be empty, make sure to call nextRow() after updateTracker() return true");
+      } else {
+        return nextRows.peek().getFirst();
+      }
+    }
+
+    boolean updateTracker(Cell currentCell) {
+      if (!initialized) {
+        for (Pair<byte[], byte[]> fuzzyData : fuzzyKeysData) {
+          updateWith(currentCell, fuzzyData);
+        }
+        initialized = true;
+      } else {
+        while (!nextRows.isEmpty() && !lessThan(currentCell, nextRows.peek().getFirst())) {
+          Pair<byte[], Pair<byte[], byte[]>> head = nextRows.poll();
+          Pair<byte[], byte[]> fuzzyData = head.getSecond();
+          updateWith(currentCell, fuzzyData);
+        }
+      }
+      return !nextRows.isEmpty();
+    }
+
+    boolean lessThan(Cell currentCell, byte[] nextRowKey) {
+      int compareResult =
+          Bytes.compareTo(currentCell.getRowArray(), currentCell.getRowOffset(),
+            currentCell.getRowLength(), nextRowKey, 0, nextRowKey.length);
+      return (!isReversed() && compareResult < 0) || (isReversed() && compareResult > 0);
+    }
+
+    void updateWith(Cell currentCell, Pair<byte[], byte[]> fuzzyData) {
+      byte[] nextRowKeyCandidate =
+          getNextForFuzzyRule(isReversed(), currentCell.getRowArray(), currentCell.getRowOffset(),
+            currentCell.getRowLength(), fuzzyData.getFirst(), fuzzyData.getSecond());
+      if (nextRowKeyCandidate != null) {
+        nextRows.add(new Pair<byte[], Pair<byte[], byte[]>>(nextRowKeyCandidate, fuzzyData));
+      }
+    }
+
+  }
+
+  @Override
+  public boolean filterAllRemaining() {
+    return done;
+  }
+
+  /**
+   * @return The filter serialized using pb
+   */
+  public byte[] toByteArray() {
+    FilterProtos.FuzzyRowFilter.Builder builder = FilterProtos.FuzzyRowFilter.newBuilder();
+    for (Pair<byte[], byte[]> fuzzyData : fuzzyKeysData) {
+      BytesBytesPair.Builder bbpBuilder = BytesBytesPair.newBuilder();
+      bbpBuilder.setFirst(ByteStringer.wrap(fuzzyData.getFirst()));
+      bbpBuilder.setSecond(ByteStringer.wrap(fuzzyData.getSecond()));
+      builder.addFuzzyKeysData(bbpBuilder);
+    }
+    return builder.build().toByteArray();
+  }
+
+  /**
+   * @param pbBytes A pb serialized {@link FuzzyRowFilter} instance
+   * @return An instance of {@link FuzzyRowFilter} made from <code>bytes</code>
+   * @throws DeserializationException
+   * @see #toByteArray
+   */
+  public static FuzzyRowFilter parseFrom(final byte[] pbBytes) throws DeserializationException {
+    FilterProtos.FuzzyRowFilter proto;
+    try {
+      proto = FilterProtos.FuzzyRowFilter.parseFrom(pbBytes);
+    } catch (InvalidProtocolBufferException e) {
+      throw new DeserializationException(e);
+    }
+    int count = proto.getFuzzyKeysDataCount();
+    ArrayList<Pair<byte[], byte[]>> fuzzyKeysData = new ArrayList<Pair<byte[], byte[]>>(count);
+    for (int i = 0; i < count; ++i) {
+      BytesBytesPair current = proto.getFuzzyKeysData(i);
+      byte[] keyBytes = current.getFirst().toByteArray();
+      byte[] keyMeta = current.getSecond().toByteArray();
+      fuzzyKeysData.add(new Pair<byte[], byte[]>(keyBytes, keyMeta));
+    }
+    return new FuzzyRowFilter(fuzzyKeysData);
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder();
+    sb.append("FuzzyRowFilter");
+    sb.append("{fuzzyKeysData=");
+    for (Pair<byte[], byte[]> fuzzyData : fuzzyKeysData) {
+      sb.append('{').append(Bytes.toStringBinary(fuzzyData.getFirst())).append(":");
+      sb.append(Bytes.toStringBinary(fuzzyData.getSecond())).append('}');
+    }
+    sb.append("}, ");
+    return sb.toString();
+  }
+
+  // Utility methods
+
+  static enum SatisfiesCode {
+    /** row satisfies fuzzy rule */
+    YES,
+    /** row doesn't satisfy fuzzy rule, but there's possible greater row that does */
+    NEXT_EXISTS,
+    /** row doesn't satisfy fuzzy rule and there's no greater row that does */
+    NO_NEXT
+  }
+
+  @VisibleForTesting
+  static SatisfiesCode satisfies(byte[] row, byte[] fuzzyKeyBytes, byte[] fuzzyKeyMeta) {
+    return satisfies(false, row, 0, row.length, fuzzyKeyBytes, fuzzyKeyMeta);
+  }
+
+  @VisibleForTesting
+  static SatisfiesCode satisfies(boolean reverse, byte[] row, byte[] fuzzyKeyBytes,
+      byte[] fuzzyKeyMeta) {
+    return satisfies(reverse, row, 0, row.length, fuzzyKeyBytes, fuzzyKeyMeta);
+  }
+
+  static SatisfiesCode satisfies(boolean reverse, byte[] row, int offset, int length,
+      byte[] fuzzyKeyBytes, byte[] fuzzyKeyMeta) {
+
+//    if (UnsafeAccess.unaligned() == false) {
+//      return satisfiesNoUnsafe(reverse, row, offset, length, fuzzyKeyBytes, fuzzyKeyMeta);
+//    }
+
+    if (row == null) {
+      // do nothing, let scan to proceed
+      return SatisfiesCode.YES;
+    }
+    length = Math.min(length, fuzzyKeyBytes.length);
+    int numWords = length / Bytes.SIZEOF_LONG;
+    int offsetAdj = offset + UnsafeAccess.BYTE_ARRAY_BASE_OFFSET;
+
+    int j = numWords << 3; // numWords * SIZEOF_LONG;
+
+    for (int i = 0; i < j; i += Bytes.SIZEOF_LONG) {
+
+      long fuzzyBytes =
+          UnsafeAccess.theUnsafe.getLong(fuzzyKeyBytes, UnsafeAccess.BYTE_ARRAY_BASE_OFFSET
+              + (long) i);
+      long fuzzyMeta =
+          UnsafeAccess.theUnsafe.getLong(fuzzyKeyMeta, UnsafeAccess.BYTE_ARRAY_BASE_OFFSET
+              + (long) i);
+      long rowValue = UnsafeAccess.theUnsafe.getLong(row, offsetAdj + (long) i);
+      if ((rowValue & fuzzyMeta) != (fuzzyBytes)) {
+        // We always return NEXT_EXISTS
+        return SatisfiesCode.NEXT_EXISTS;
+      }
+    }
+
+    int off = j;
+
+    if (length - off >= Bytes.SIZEOF_INT) {
+      int fuzzyBytes =
+          UnsafeAccess.theUnsafe.getInt(fuzzyKeyBytes, UnsafeAccess.BYTE_ARRAY_BASE_OFFSET
+              + (long) off);
+      int fuzzyMeta =
+          UnsafeAccess.theUnsafe.getInt(fuzzyKeyMeta, UnsafeAccess.BYTE_ARRAY_BASE_OFFSET
+              + (long) off);
+      int rowValue = UnsafeAccess.theUnsafe.getInt(row, offsetAdj + (long) off);
+      if ((rowValue & fuzzyMeta) != (fuzzyBytes)) {
+        // We always return NEXT_EXISTS
+        return SatisfiesCode.NEXT_EXISTS;
+      }
+      off += Bytes.SIZEOF_INT;
+    }
+
+    if (length - off >= Bytes.SIZEOF_SHORT) {
+      short fuzzyBytes =
+          UnsafeAccess.theUnsafe.getShort(fuzzyKeyBytes, UnsafeAccess.BYTE_ARRAY_BASE_OFFSET
+              + (long) off);
+      short fuzzyMeta =
+          UnsafeAccess.theUnsafe.getShort(fuzzyKeyMeta, UnsafeAccess.BYTE_ARRAY_BASE_OFFSET
+              + (long) off);
+      short rowValue = UnsafeAccess.theUnsafe.getShort(row, offsetAdj + (long) off);
+      if ((rowValue & fuzzyMeta) != (fuzzyBytes)) {
+        // We always return NEXT_EXISTS
+        // even if it does not (in this case getNextForFuzzyRule
+        // will return null)
+        return SatisfiesCode.NEXT_EXISTS;
+      }
+      off += Bytes.SIZEOF_SHORT;
+    }
+
+    if (length - off >= Bytes.SIZEOF_BYTE) {
+      int fuzzyBytes = fuzzyKeyBytes[off] & 0xff;
+      int fuzzyMeta = fuzzyKeyMeta[off] & 0xff;
+      int rowValue = row[offset + off] & 0xff;
+      if ((rowValue & fuzzyMeta) != (fuzzyBytes)) {
+        // We always return NEXT_EXISTS
+        return SatisfiesCode.NEXT_EXISTS;
+      }
+    }
+    return SatisfiesCode.YES;
+  }
+
+  static SatisfiesCode satisfiesNoUnsafe(boolean reverse, byte[] row, int offset, int length,
+      byte[] fuzzyKeyBytes, byte[] fuzzyKeyMeta) {
+    if (row == null) {
+      // do nothing, let scan to proceed
+      return SatisfiesCode.YES;
+    }
+
+    Order order = Order.orderFor(reverse);
+    boolean nextRowKeyCandidateExists = false;
+
+    for (int i = 0; i < fuzzyKeyMeta.length && i < length; i++) {
+      // First, checking if this position is fixed and not equals the given one
+      boolean byteAtPositionFixed = fuzzyKeyMeta[i] == 0;
+      boolean fixedByteIncorrect = byteAtPositionFixed && fuzzyKeyBytes[i] != row[i + offset];
+      if (fixedByteIncorrect) {
+        // in this case there's another row that satisfies fuzzy rule and bigger than this row
+        if (nextRowKeyCandidateExists) {
+          return SatisfiesCode.NEXT_EXISTS;
+        }
+
+        // If this row byte is less than fixed then there's a byte array bigger than
+        // this row and which satisfies the fuzzy rule. Otherwise there's no such byte array:
+        // this row is simply bigger than any byte array that satisfies the fuzzy rule
+        boolean rowByteLessThanFixed = (row[i + offset] & 0xFF) < (fuzzyKeyBytes[i] & 0xFF);
+        if (rowByteLessThanFixed && !reverse) {
+          return SatisfiesCode.NEXT_EXISTS;
+        } else if (!rowByteLessThanFixed && reverse) {
+          return SatisfiesCode.NEXT_EXISTS;
+        } else {
+          return SatisfiesCode.NO_NEXT;
+        }
+      }
+
+      // Second, checking if this position is not fixed and byte value is not the biggest. In this
+      // case there's a byte array bigger than this row and which satisfies the fuzzy rule. To get
+      // bigger byte array that satisfies the rule we need to just increase this byte
+      // (see the code of getNextForFuzzyRule below) by one.
+      // Note: if non-fixed byte is already at biggest value, this doesn't allow us to say there's
+      // bigger one that satisfies the rule as it can't be increased.
+      if (fuzzyKeyMeta[i] == 1 && !order.isMax(fuzzyKeyBytes[i])) {
+        nextRowKeyCandidateExists = true;
+      }
+    }
+    return SatisfiesCode.YES;
+  }
+
+  @VisibleForTesting
+  static byte[] getNextForFuzzyRule(byte[] row, byte[] fuzzyKeyBytes, byte[] fuzzyKeyMeta) {
+    return getNextForFuzzyRule(false, row, 0, row.length, fuzzyKeyBytes, fuzzyKeyMeta);
+  }
+
+  @VisibleForTesting
+  static byte[] getNextForFuzzyRule(boolean reverse, byte[] row, byte[] fuzzyKeyBytes,
+      byte[] fuzzyKeyMeta) {
+    return getNextForFuzzyRule(reverse, row, 0, row.length, fuzzyKeyBytes, fuzzyKeyMeta);
+  }
+
+  /** Abstracts directional comparisons based on scan direction. */
+  private enum Order {
+    ASC {
+      public boolean lt(int lhs, int rhs) {
+        return lhs < rhs;
+      }
+
+      public boolean gt(int lhs, int rhs) {
+        return lhs > rhs;
+      }
+
+      public byte inc(byte val) {
+        // TODO: what about over/underflow?
+        return (byte) (val + 1);
+      }
+
+      public boolean isMax(byte val) {
+        return val == (byte) 0xff;
+      }
+
+      public byte min() {
+        return 0;
+      }
+    },
+    DESC {
+      public boolean lt(int lhs, int rhs) {
+        return lhs > rhs;
+      }
+
+      public boolean gt(int lhs, int rhs) {
+        return lhs < rhs;
+      }
+
+      public byte inc(byte val) {
+        // TODO: what about over/underflow?
+        return (byte) (val - 1);
+      }
+
+      public boolean isMax(byte val) {
+        return val == 0;
+      }
+
+      public byte min() {
+        return (byte) 0xFF;
+      }
+    };
+
+    public static Order orderFor(boolean reverse) {
+      return reverse ? DESC : ASC;
+    }
+
+    /** Returns true when {@code lhs < rhs}. */
+    public abstract boolean lt(int lhs, int rhs);
+
+    /** Returns true when {@code lhs > rhs}. */
+    public abstract boolean gt(int lhs, int rhs);
+
+    /** Returns {@code val} incremented by 1. */
+    public abstract byte inc(byte val);
+
+    /** Return true when {@code val} is the maximum value */
+    public abstract boolean isMax(byte val);
+
+    /** Return the minimum value according to this ordering scheme. */
+    public abstract byte min();
+  }
+
+  /**
+   * @return greater byte array than given (row) which satisfies the fuzzy rule if it exists, null
+   *         otherwise
+   */
+  @VisibleForTesting
+  static byte[] getNextForFuzzyRule(boolean reverse, byte[] row, int offset, int length,
+      byte[] fuzzyKeyBytes, byte[] fuzzyKeyMeta) {
+    // To find out the next "smallest" byte array that satisfies fuzzy rule and "greater" than
+    // the given one we do the following:
+    // 1. setting values on all "fixed" positions to the values from fuzzyKeyBytes
+    // 2. if during the first step given row did not increase, then we increase the value at
+    // the first "non-fixed" position (where it is not maximum already)
+
+    // It is easier to perform this by using fuzzyKeyBytes copy and setting "non-fixed" position
+    // values than otherwise.
+    byte[] result =
+        Arrays.copyOf(fuzzyKeyBytes, length > fuzzyKeyBytes.length ? length : fuzzyKeyBytes.length);
+    if (reverse && length > fuzzyKeyBytes.length) {
+      // we need trailing 0xff's instead of trailing 0x00's
+      for (int i = fuzzyKeyBytes.length; i < result.length; i++) {
+        result[i] = (byte) 0xFF;
+      }
+    }
+    int toInc = -1;
+    final Order order = Order.orderFor(reverse);
+
+    boolean increased = false;
+    for (int i = 0; i < result.length; i++) {
+      if (i >= fuzzyKeyMeta.length || fuzzyKeyMeta[i] == 0 /* non-fixed */) {
+        result[i] = row[offset + i];
+        if (!order.isMax(row[offset + i])) {
+          // this is "non-fixed" position and is not at max value, hence we can increase it
+          toInc = i;
+        }
+      } else if (i < fuzzyKeyMeta.length && fuzzyKeyMeta[i] == -1 /* fixed */) {
+        if (order.lt((row[i + offset] & 0xFF), (fuzzyKeyBytes[i] & 0xFF))) {
+          // if setting value for any fixed position increased the original array,
+          // we are OK
+          increased = true;
+          break;
+        }
+
+        if (order.gt((row[i + offset] & 0xFF), (fuzzyKeyBytes[i] & 0xFF))) {
+          // if setting value for any fixed position makes array "smaller", then just stop:
+          // in case we found some non-fixed position to increase we will do it, otherwise
+          // there's no "next" row key that satisfies fuzzy rule and "greater" than given row
+          break;
+        }
+      }
+    }
+
+    if (!increased) {
+      if (toInc < 0) {
+        return null;
+      }
+      result[toInc] = order.inc(result[toInc]);
+
+      // Setting all "non-fixed" positions to zeroes to the right of the one we increased so
+      // that found "next" row key is the smallest possible
+      for (int i = toInc + 1; i < result.length; i++) {
+        if (i >= fuzzyKeyMeta.length || fuzzyKeyMeta[i] == 0 /* non-fixed */) {
+          result[i] = order.min();
+        }
+      }
+    }
+
+    return reverse? result: trimTrailingZeroes(result, fuzzyKeyMeta, toInc);
+  }
+
+  /**
+   * For forward scanner, next cell hint should  not contain any trailing zeroes
+   * unless they are part of fuzzyKeyMeta
+   * hint = '\x01\x01\x01\x00\x00'
+   * will skip valid row '\x01\x01\x01'
+   * 
+   * @param result
+   * @param fuzzyKeyMeta
+   * @param toInc - position of incremented byte
+   * @return trimmed version of result
+   */
+  
+  private static byte[] trimTrailingZeroes(byte[] result, byte[] fuzzyKeyMeta, int toInc) {
+    int off = fuzzyKeyMeta.length >= result.length? result.length -1:
+           fuzzyKeyMeta.length -1;  
+    for( ; off >= 0; off--){
+      if(fuzzyKeyMeta[off] != 0) break;
+    }
+    if (off < toInc)  off = toInc;
+    byte[] retValue = new byte[off+1];
+    System.arraycopy(result, 0, retValue, 0, retValue.length);
+    return retValue;
+  }
+
+  /**
+   * @return true if and only if the fields of the filter that are serialized are equal to the
+   *         corresponding fields in other. Used for testing.
+   */
+  boolean areSerializedFieldsEqual(Filter o) {
+    if (o == this) return true;
+    if (!(o instanceof FuzzyRowFilter)) return false;
+
+    FuzzyRowFilter other = (FuzzyRowFilter) o;
+    if (this.fuzzyKeysData.size() != other.fuzzyKeysData.size()) return false;
+    for (int i = 0; i < fuzzyKeysData.size(); ++i) {
+      Pair<byte[], byte[]> thisData = this.fuzzyKeysData.get(i);
+      Pair<byte[], byte[]> otherData = other.fuzzyKeysData.get(i);
+      if (!(Bytes.equals(thisData.getFirst(), otherData.getFirst()) && Bytes.equals(
+        thisData.getSecond(), otherData.getSecond()))) {
+        return false;
+      }
+    }
+    return true;
+  }
+}