You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/03/02 10:16:12 UTC

[1/3] kylin git commit: KYLIN-920 & KYLIN-782 $ KYLIN-1422 Upgrade to HBase 1.1 (with help from murkrishn ) [Forced Update!]

Repository: kylin
Updated Branches:
  refs/heads/1.x-HBase1.1.3 b5180d69a -> 4313900e6 (forced update)


http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/storage/src/main/java/org/apache/kylin/storage/hbase/filter/FuzzyRowFilter.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/filter/FuzzyRowFilter.java b/storage/src/main/java/org/apache/kylin/storage/hbase/filter/FuzzyRowFilter.java
new file mode 100644
index 0000000..9d21f98
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/filter/FuzzyRowFilter.java
@@ -0,0 +1,636 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.storage.hbase.filter;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.protobuf.generated.FilterProtos;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair;
+import org.apache.hadoop.hbase.util.ByteStringer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * This is optimized version of a standard FuzzyRowFilter Filters data based on fuzzy row key.
+ * Performs fast-forwards during scanning. It takes pairs (row key, fuzzy info) to match row keys.
+ * Where fuzzy info is a byte array with 0 or 1 as its values:
+ * <ul>
+ * <li>0 - means that this byte in provided row key is fixed, i.e. row key's byte at same position
+ * must match</li>
+ * <li>1 - means that this byte in provided row key is NOT fixed, i.e. row key's byte at this
+ * position can be different from the one in provided row key</li>
+ * </ul>
+ * Example: Let's assume row key format is userId_actionId_year_month. Length of userId is fixed and
+ * is 4, length of actionId is 2 and year and month are 4 and 2 bytes long respectively. Let's
+ * assume that we need to fetch all users that performed certain action (encoded as "99") in Jan of
+ * any year. Then the pair (row key, fuzzy info) would be the following: row key = "????_99_????_01"
+ * (one can use any value instead of "?") fuzzy info =
+ * "\x01\x01\x01\x01\x00\x00\x00\x00\x01\x01\x01\x01\x00\x00\x00" I.e. fuzzy info tells the matching
+ * mask is "????_99_????_01", where at ? can be any value.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class FuzzyRowFilter extends FilterBase {
+    private List<Pair<byte[], byte[]>> fuzzyKeysData;
+    private boolean done = false;
+
+    /**
+     * The index of a last successfully found matching fuzzy string (in fuzzyKeysData). We will start
+     * matching next KV with this one. If they do not match then we will return back to the one-by-one
+     * iteration over fuzzyKeysData.
+     */
+    private int lastFoundIndex = -1;
+
+    /**
+     * Row tracker (keeps all next rows after SEEK_NEXT_USING_HINT was returned)
+     */
+    private RowTracker tracker;
+
+    public FuzzyRowFilter(List<Pair<byte[], byte[]>> fuzzyKeysData) {
+        Pair<byte[], byte[]> p;
+        for (int i = 0; i < fuzzyKeysData.size(); i++) {
+            p = fuzzyKeysData.get(i);
+            if (p.getFirst().length != p.getSecond().length) {
+                Pair<String, String> readable =
+                        new Pair<String, String>(Bytes.toStringBinary(p.getFirst()), Bytes.toStringBinary(p
+                                .getSecond()));
+                throw new IllegalArgumentException("Fuzzy pair lengths do not match: " + readable);
+            }
+            // update mask ( 0 -> -1 (0xff), 1 -> 0)
+            p.setSecond(preprocessMask(p.getSecond()));
+            preprocessSearchKey(p);
+        }
+        this.fuzzyKeysData = fuzzyKeysData;
+        this.tracker = new RowTracker();
+    }
+
+    private void preprocessSearchKey(Pair<byte[], byte[]> p) {
+        if (UnsafeAccess.unaligned() == false) {
+            return;
+        }
+        byte[] key = p.getFirst();
+        byte[] mask = p.getSecond();
+        for (int i = 0; i < mask.length; i++) {
+            // set non-fixed part of a search key to 0.
+            if (mask[i] == 0) key[i] = 0;
+        }
+    }
+
+    /**
+     * We need to preprocess mask array, as since we treat 0's as unfixed positions and -1 (0xff) as
+     * fixed positions
+     * @param mask
+     * @return mask array
+     */
+    private byte[] preprocessMask(byte[] mask) {
+        if (UnsafeAccess.unaligned() == false) {
+            return mask;
+        }
+        if (isPreprocessedMask(mask)) return mask;
+        for (int i = 0; i < mask.length; i++) {
+            if (mask[i] == 0) {
+                mask[i] = -1; // 0 -> -1
+            } else if (mask[i] == 1) {
+                mask[i] = 0;// 1 -> 0
+            }
+        }
+        return mask;
+    }
+
+    private boolean isPreprocessedMask(byte[] mask) {
+        for (int i = 0; i < mask.length; i++) {
+            if (mask[i] != -1 && mask[i] != 0) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public ReturnCode filterKeyValue(Cell c) {
+        final int startIndex = lastFoundIndex >= 0 ? lastFoundIndex : 0;
+        final int size = fuzzyKeysData.size();
+        for (int i = startIndex; i < size + startIndex; i++) {
+            final int index = i % size;
+            Pair<byte[], byte[]> fuzzyData = fuzzyKeysData.get(index);
+            SatisfiesCode satisfiesCode =
+                    satisfies(isReversed(), c.getRowArray(), c.getRowOffset(), c.getRowLength(),
+                            fuzzyData.getFirst(), fuzzyData.getSecond());
+            if (satisfiesCode == SatisfiesCode.YES) {
+                lastFoundIndex = index;
+                return ReturnCode.INCLUDE;
+            }
+        }
+        // NOT FOUND -> seek next using hint
+        lastFoundIndex = -1;
+
+        return ReturnCode.SEEK_NEXT_USING_HINT;
+
+    }
+
+    @Override
+    public Cell getNextCellHint(Cell currentCell) {
+        boolean result = tracker.updateTracker(currentCell);
+        if (result == false) {
+            done = true;
+            return null;
+        }
+        byte[] nextRowKey = tracker.nextRow();
+        return KeyValueUtil.createFirstOnRow(nextRowKey);
+    }
+
+    /**
+     * If we have multiple fuzzy keys, row tracker should improve overall performance. It calculates
+     * all next rows (one per every fuzzy key) and put them (the fuzzy key is bundled) into a priority
+     * queue so that the smallest row key always appears at queue head, which helps to decide the
+     * "Next Cell Hint". As scanning going on, the number of candidate rows in the RowTracker will
+     * remain the size of fuzzy keys until some of the fuzzy keys won't possibly have matches any
+     * more.
+     */
+    private class RowTracker {
+        private final PriorityQueue<Pair<byte[], Pair<byte[], byte[]>>> nextRows;
+        private boolean initialized = false;
+
+        RowTracker() {
+            nextRows =
+                    new PriorityQueue<Pair<byte[], Pair<byte[], byte[]>>>(fuzzyKeysData.size(),
+                            new Comparator<Pair<byte[], Pair<byte[], byte[]>>>() {
+                                @Override
+                                public int compare(Pair<byte[], Pair<byte[], byte[]>> o1,
+                                        Pair<byte[], Pair<byte[], byte[]>> o2) {
+                                    int compare = Bytes.compareTo(o1.getFirst(), o2.getFirst());
+                                    if (!isReversed()) {
+                                        return compare;
+                                    } else {
+                                        return -compare;
+                                    }
+                                }
+                            });
+        }
+
+        byte[] nextRow() {
+            if (nextRows.isEmpty()) {
+                throw new IllegalStateException(
+                        "NextRows should not be empty, make sure to call nextRow() after updateTracker() return true");
+            } else {
+                return nextRows.peek().getFirst();
+            }
+        }
+
+        boolean updateTracker(Cell currentCell) {
+            if (!initialized) {
+                for (Pair<byte[], byte[]> fuzzyData : fuzzyKeysData) {
+                    updateWith(currentCell, fuzzyData);
+                }
+                initialized = true;
+            } else {
+                while (!nextRows.isEmpty() && !lessThan(currentCell, nextRows.peek().getFirst())) {
+                    Pair<byte[], Pair<byte[], byte[]>> head = nextRows.poll();
+                    Pair<byte[], byte[]> fuzzyData = head.getSecond();
+                    updateWith(currentCell, fuzzyData);
+                }
+            }
+            return !nextRows.isEmpty();
+        }
+
+        boolean lessThan(Cell currentCell, byte[] nextRowKey) {
+            int compareResult =
+                    Bytes.compareTo(currentCell.getRowArray(), currentCell.getRowOffset(),
+                            currentCell.getRowLength(), nextRowKey, 0, nextRowKey.length);
+            return (!isReversed() && compareResult < 0) || (isReversed() && compareResult > 0);
+        }
+
+        void updateWith(Cell currentCell, Pair<byte[], byte[]> fuzzyData) {
+            byte[] nextRowKeyCandidate =
+                    getNextForFuzzyRule(isReversed(), currentCell.getRowArray(), currentCell.getRowOffset(),
+                            currentCell.getRowLength(), fuzzyData.getFirst(), fuzzyData.getSecond());
+            if (nextRowKeyCandidate != null) {
+                nextRows.add(new Pair<byte[], Pair<byte[], byte[]>>(nextRowKeyCandidate, fuzzyData));
+            }
+        }
+
+    }
+
+    @Override
+    public boolean filterAllRemaining() {
+        return done;
+    }
+
+    /**
+     * @return The filter serialized using pb
+     */
+    public byte[] toByteArray() {
+        FilterProtos.FuzzyRowFilter.Builder builder = FilterProtos.FuzzyRowFilter.newBuilder();
+        for (Pair<byte[], byte[]> fuzzyData : fuzzyKeysData) {
+            BytesBytesPair.Builder bbpBuilder = BytesBytesPair.newBuilder();
+            bbpBuilder.setFirst(ByteStringer.wrap(fuzzyData.getFirst()));
+            bbpBuilder.setSecond(ByteStringer.wrap(fuzzyData.getSecond()));
+            builder.addFuzzyKeysData(bbpBuilder);
+        }
+        return builder.build().toByteArray();
+    }
+
+    /**
+     * @param pbBytes A pb serialized {@link FuzzyRowFilter} instance
+     * @return An instance of {@link FuzzyRowFilter} made from <code>bytes</code>
+     * @throws DeserializationException
+     * @see #toByteArray
+     */
+    public static FuzzyRowFilter parseFrom(final byte[] pbBytes) throws DeserializationException {
+        FilterProtos.FuzzyRowFilter proto;
+        try {
+            proto = FilterProtos.FuzzyRowFilter.parseFrom(pbBytes);
+        } catch (InvalidProtocolBufferException e) {
+            throw new DeserializationException(e);
+        }
+        int count = proto.getFuzzyKeysDataCount();
+        ArrayList<Pair<byte[], byte[]>> fuzzyKeysData = new ArrayList<Pair<byte[], byte[]>>(count);
+        for (int i = 0; i < count; ++i) {
+            BytesBytesPair current = proto.getFuzzyKeysData(i);
+            byte[] keyBytes = current.getFirst().toByteArray();
+            byte[] keyMeta = current.getSecond().toByteArray();
+            fuzzyKeysData.add(new Pair<byte[], byte[]>(keyBytes, keyMeta));
+        }
+        return new FuzzyRowFilter(fuzzyKeysData);
+    }
+
+    @Override
+    public String toString() {
+        final StringBuilder sb = new StringBuilder();
+        sb.append("FuzzyRowFilter");
+        sb.append("{fuzzyKeysData=");
+        for (Pair<byte[], byte[]> fuzzyData : fuzzyKeysData) {
+            sb.append('{').append(Bytes.toStringBinary(fuzzyData.getFirst())).append(":");
+            sb.append(Bytes.toStringBinary(fuzzyData.getSecond())).append('}');
+        }
+        sb.append("}, ");
+        return sb.toString();
+    }
+
+    // Utility methods
+
+    static enum SatisfiesCode {
+        /** row satisfies fuzzy rule */
+        YES,
+        /** row doesn't satisfy fuzzy rule, but there's possible greater row that does */
+        NEXT_EXISTS,
+        /** row doesn't satisfy fuzzy rule and there's no greater row that does */
+        NO_NEXT
+    }
+
+    @VisibleForTesting
+    static SatisfiesCode satisfies(byte[] row, byte[] fuzzyKeyBytes, byte[] fuzzyKeyMeta) {
+        return satisfies(false, row, 0, row.length, fuzzyKeyBytes, fuzzyKeyMeta);
+    }
+
+    @VisibleForTesting
+    static SatisfiesCode satisfies(boolean reverse, byte[] row, byte[] fuzzyKeyBytes,
+            byte[] fuzzyKeyMeta) {
+        return satisfies(reverse, row, 0, row.length, fuzzyKeyBytes, fuzzyKeyMeta);
+    }
+
+    static SatisfiesCode satisfies(boolean reverse, byte[] row, int offset, int length,
+            byte[] fuzzyKeyBytes, byte[] fuzzyKeyMeta) {
+
+        if (UnsafeAccess.unaligned() == false) {
+            return satisfiesNoUnsafe(reverse, row, offset, length, fuzzyKeyBytes, fuzzyKeyMeta);
+        }
+
+        if (row == null) {
+            // do nothing, let scan to proceed
+            return SatisfiesCode.YES;
+        }
+        length = Math.min(length, fuzzyKeyBytes.length);
+        int numWords = length / Bytes.SIZEOF_LONG;
+        int offsetAdj = offset + UnsafeAccess.BYTE_ARRAY_BASE_OFFSET;
+
+        int j = numWords << 3; // numWords * SIZEOF_LONG;
+
+        for (int i = 0; i < j; i += Bytes.SIZEOF_LONG) {
+
+            long fuzzyBytes =
+                    UnsafeAccess.theUnsafe.getLong(fuzzyKeyBytes, UnsafeAccess.BYTE_ARRAY_BASE_OFFSET
+                            + (long) i);
+            long fuzzyMeta =
+                    UnsafeAccess.theUnsafe.getLong(fuzzyKeyMeta, UnsafeAccess.BYTE_ARRAY_BASE_OFFSET
+                            + (long) i);
+            long rowValue = UnsafeAccess.theUnsafe.getLong(row, offsetAdj + (long) i);
+            if ((rowValue & fuzzyMeta) != (fuzzyBytes)) {
+                // We always return NEXT_EXISTS
+                return SatisfiesCode.NEXT_EXISTS;
+            }
+        }
+
+        int off = j;
+
+        if (length - off >= Bytes.SIZEOF_INT) {
+            int fuzzyBytes =
+                    UnsafeAccess.theUnsafe.getInt(fuzzyKeyBytes, UnsafeAccess.BYTE_ARRAY_BASE_OFFSET
+                            + (long) off);
+            int fuzzyMeta =
+                    UnsafeAccess.theUnsafe.getInt(fuzzyKeyMeta, UnsafeAccess.BYTE_ARRAY_BASE_OFFSET
+                            + (long) off);
+            int rowValue = UnsafeAccess.theUnsafe.getInt(row, offsetAdj + (long) off);
+            if ((rowValue & fuzzyMeta) != (fuzzyBytes)) {
+                // We always return NEXT_EXISTS
+                return SatisfiesCode.NEXT_EXISTS;
+            }
+            off += Bytes.SIZEOF_INT;
+        }
+
+        if (length - off >= Bytes.SIZEOF_SHORT) {
+            short fuzzyBytes =
+                    UnsafeAccess.theUnsafe.getShort(fuzzyKeyBytes, UnsafeAccess.BYTE_ARRAY_BASE_OFFSET
+                            + (long) off);
+            short fuzzyMeta =
+                    UnsafeAccess.theUnsafe.getShort(fuzzyKeyMeta, UnsafeAccess.BYTE_ARRAY_BASE_OFFSET
+                            + (long) off);
+            short rowValue = UnsafeAccess.theUnsafe.getShort(row, offsetAdj + (long) off);
+            if ((rowValue & fuzzyMeta) != (fuzzyBytes)) {
+                // We always return NEXT_EXISTS
+                // even if it does not (in this case getNextForFuzzyRule
+                // will return null)
+                return SatisfiesCode.NEXT_EXISTS;
+            }
+            off += Bytes.SIZEOF_SHORT;
+        }
+
+        if (length - off >= Bytes.SIZEOF_BYTE) {
+            int fuzzyBytes = fuzzyKeyBytes[off] & 0xff;
+            int fuzzyMeta = fuzzyKeyMeta[off] & 0xff;
+            int rowValue = row[offset + off] & 0xff;
+            if ((rowValue & fuzzyMeta) != (fuzzyBytes)) {
+                // We always return NEXT_EXISTS
+                return SatisfiesCode.NEXT_EXISTS;
+            }
+        }
+        return SatisfiesCode.YES;
+    }
+
+    static SatisfiesCode satisfiesNoUnsafe(boolean reverse, byte[] row, int offset, int length,
+            byte[] fuzzyKeyBytes, byte[] fuzzyKeyMeta) {
+        if (row == null) {
+            // do nothing, let scan to proceed
+            return SatisfiesCode.YES;
+        }
+
+        Order order = Order.orderFor(reverse);
+        boolean nextRowKeyCandidateExists = false;
+
+        for (int i = 0; i < fuzzyKeyMeta.length && i < length; i++) {
+            // First, checking if this position is fixed and not equals the given one
+            boolean byteAtPositionFixed = fuzzyKeyMeta[i] == 0;
+            boolean fixedByteIncorrect = byteAtPositionFixed && fuzzyKeyBytes[i] != row[i + offset];
+            if (fixedByteIncorrect) {
+                // in this case there's another row that satisfies fuzzy rule and bigger than this row
+                if (nextRowKeyCandidateExists) {
+                    return SatisfiesCode.NEXT_EXISTS;
+                }
+
+                // If this row byte is less than fixed then there's a byte array bigger than
+                // this row and which satisfies the fuzzy rule. Otherwise there's no such byte array:
+                // this row is simply bigger than any byte array that satisfies the fuzzy rule
+                boolean rowByteLessThanFixed = (row[i + offset] & 0xFF) < (fuzzyKeyBytes[i] & 0xFF);
+                if (rowByteLessThanFixed && !reverse) {
+                    return SatisfiesCode.NEXT_EXISTS;
+                } else if (!rowByteLessThanFixed && reverse) {
+                    return SatisfiesCode.NEXT_EXISTS;
+                } else {
+                    return SatisfiesCode.NO_NEXT;
+                }
+            }
+
+            // Second, checking if this position is not fixed and byte value is not the biggest. In this
+            // case there's a byte array bigger than this row and which satisfies the fuzzy rule. To get
+            // bigger byte array that satisfies the rule we need to just increase this byte
+            // (see the code of getNextForFuzzyRule below) by one.
+            // Note: if non-fixed byte is already at biggest value, this doesn't allow us to say there's
+            // bigger one that satisfies the rule as it can't be increased.
+            if (fuzzyKeyMeta[i] == 1 && !order.isMax(fuzzyKeyBytes[i])) {
+                nextRowKeyCandidateExists = true;
+            }
+        }
+        return SatisfiesCode.YES;
+    }
+
+    @VisibleForTesting
+    static byte[] getNextForFuzzyRule(byte[] row, byte[] fuzzyKeyBytes, byte[] fuzzyKeyMeta) {
+        return getNextForFuzzyRule(false, row, 0, row.length, fuzzyKeyBytes, fuzzyKeyMeta);
+    }
+
+    @VisibleForTesting
+    static byte[] getNextForFuzzyRule(boolean reverse, byte[] row, byte[] fuzzyKeyBytes,
+            byte[] fuzzyKeyMeta) {
+        return getNextForFuzzyRule(reverse, row, 0, row.length, fuzzyKeyBytes, fuzzyKeyMeta);
+    }
+
+    /** Abstracts directional comparisons based on scan direction. */
+    private enum Order {
+        ASC {
+            public boolean lt(int lhs, int rhs) {
+                return lhs < rhs;
+            }
+
+            public boolean gt(int lhs, int rhs) {
+                return lhs > rhs;
+            }
+
+            public byte inc(byte val) {
+                // TODO: what about over/underflow?
+                return (byte) (val + 1);
+            }
+
+            public boolean isMax(byte val) {
+                return val == (byte) 0xff;
+            }
+
+            public byte min() {
+                return 0;
+            }
+        },
+        DESC {
+            public boolean lt(int lhs, int rhs) {
+                return lhs > rhs;
+            }
+
+            public boolean gt(int lhs, int rhs) {
+                return lhs < rhs;
+            }
+
+            public byte inc(byte val) {
+                // TODO: what about over/underflow?
+                return (byte) (val - 1);
+            }
+
+            public boolean isMax(byte val) {
+                return val == 0;
+            }
+
+            public byte min() {
+                return (byte) 0xFF;
+            }
+        };
+
+        public static Order orderFor(boolean reverse) {
+            return reverse ? DESC : ASC;
+        }
+
+        /** Returns true when {@code lhs < rhs}. */
+        public abstract boolean lt(int lhs, int rhs);
+
+        /** Returns true when {@code lhs > rhs}. */
+        public abstract boolean gt(int lhs, int rhs);
+
+        /** Returns {@code val} incremented by 1. */
+        public abstract byte inc(byte val);
+
+        /** Return true when {@code val} is the maximum value */
+        public abstract boolean isMax(byte val);
+
+        /** Return the minimum value according to this ordering scheme. */
+        public abstract byte min();
+    }
+
+    /**
+     * @return greater byte array than given (row) which satisfies the fuzzy rule if it exists, null
+     *         otherwise
+     */
+    @VisibleForTesting
+    static byte[] getNextForFuzzyRule(boolean reverse, byte[] row, int offset, int length,
+            byte[] fuzzyKeyBytes, byte[] fuzzyKeyMeta) {
+        // To find out the next "smallest" byte array that satisfies fuzzy rule and "greater" than
+        // the given one we do the following:
+        // 1. setting values on all "fixed" positions to the values from fuzzyKeyBytes
+        // 2. if during the first step given row did not increase, then we increase the value at
+        // the first "non-fixed" position (where it is not maximum already)
+
+        // It is easier to perform this by using fuzzyKeyBytes copy and setting "non-fixed" position
+        // values than otherwise.
+        byte[] result =
+                Arrays.copyOf(fuzzyKeyBytes, length > fuzzyKeyBytes.length ? length : fuzzyKeyBytes.length);
+        if (reverse && length > fuzzyKeyBytes.length) {
+            // we need trailing 0xff's instead of trailing 0x00's
+            for (int i = fuzzyKeyBytes.length; i < result.length; i++) {
+                result[i] = (byte) 0xFF;
+            }
+        }
+        int toInc = -1;
+        final Order order = Order.orderFor(reverse);
+
+        boolean increased = false;
+        for (int i = 0; i < result.length; i++) {
+            if (i >= fuzzyKeyMeta.length || fuzzyKeyMeta[i] == 0 /* non-fixed */) {
+                result[i] = row[offset + i];
+                if (!order.isMax(row[offset + i])) {
+                    // this is "non-fixed" position and is not at max value, hence we can increase it
+                    toInc = i;
+                }
+            } else if (i < fuzzyKeyMeta.length && fuzzyKeyMeta[i] == -1 /* fixed */) {
+                if (order.lt((row[i + offset] & 0xFF), (fuzzyKeyBytes[i] & 0xFF))) {
+                    // if setting value for any fixed position increased the original array,
+                    // we are OK
+                    increased = true;
+                    break;
+                }
+
+                if (order.gt((row[i + offset] & 0xFF), (fuzzyKeyBytes[i] & 0xFF))) {
+                    // if setting value for any fixed position makes array "smaller", then just stop:
+                    // in case we found some non-fixed position to increase we will do it, otherwise
+                    // there's no "next" row key that satisfies fuzzy rule and "greater" than given row
+                    break;
+                }
+            }
+        }
+
+        if (!increased) {
+            if (toInc < 0) {
+                return null;
+            }
+            result[toInc] = order.inc(result[toInc]);
+
+            // Setting all "non-fixed" positions to zeroes to the right of the one we increased so
+            // that found "next" row key is the smallest possible
+            for (int i = toInc + 1; i < result.length; i++) {
+                if (i >= fuzzyKeyMeta.length || fuzzyKeyMeta[i] == 0 /* non-fixed */) {
+                    result[i] = order.min();
+                }
+            }
+        }
+
+        return reverse? result: trimTrailingZeroes(result, fuzzyKeyMeta, toInc);
+    }
+
+    /**
+     * For forward scanner, next cell hint should  not contain any trailing zeroes
+     * unless they are part of fuzzyKeyMeta
+     * hint = '\x01\x01\x01\x00\x00'
+     * will skip valid row '\x01\x01\x01'
+     *
+     * @param result
+     * @param fuzzyKeyMeta
+     * @param toInc - position of incremented byte
+     * @return trimmed version of result
+     */
+
+    private static byte[] trimTrailingZeroes(byte[] result, byte[] fuzzyKeyMeta, int toInc) {
+        int off = fuzzyKeyMeta.length >= result.length? result.length -1:
+                fuzzyKeyMeta.length -1;
+        for( ; off >= 0; off--){
+            if(fuzzyKeyMeta[off] != 0) break;
+        }
+        if (off < toInc)  off = toInc;
+        byte[] retValue = new byte[off+1];
+        System.arraycopy(result, 0, retValue, 0, retValue.length);
+        return retValue;
+    }
+
+    /**
+     * @return true if and only if the fields of the filter that are serialized are equal to the
+     *         corresponding fields in other. Used for testing.
+     */
+    boolean areSerializedFieldsEqual(Filter o) {
+        if (o == this) return true;
+        if (!(o instanceof FuzzyRowFilter)) return false;
+
+        FuzzyRowFilter other = (FuzzyRowFilter) o;
+        if (this.fuzzyKeysData.size() != other.fuzzyKeysData.size()) return false;
+        for (int i = 0; i < fuzzyKeysData.size(); ++i) {
+            Pair<byte[], byte[]> thisData = this.fuzzyKeysData.get(i);
+            Pair<byte[], byte[]> otherData = other.fuzzyKeysData.get(i);
+            if (!(Bytes.equals(thisData.getFirst(), otherData.getFirst()) && Bytes.equals(
+                    thisData.getSecond(), otherData.getSecond()))) {
+                return false;
+            }
+        }
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/storage/src/main/java/org/apache/kylin/storage/hbase/filter/UnsafeAccess.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/filter/UnsafeAccess.java b/storage/src/main/java/org/apache/kylin/storage/hbase/filter/UnsafeAccess.java
new file mode 100644
index 0000000..4d336b0
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/filter/UnsafeAccess.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.hbase.filter;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.nio.ByteOrder;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+import sun.misc.Unsafe;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public final class UnsafeAccess {
+
+    private static final Log LOG = LogFactory.getLog(UnsafeAccess.class);
+
+    public static final Unsafe theUnsafe;
+
+    /**
+     * The offset to the first element in a byte array.
+     */
+    public static final int BYTE_ARRAY_BASE_OFFSET;
+    private static boolean unaligned = false;
+
+    static {
+        theUnsafe = (Unsafe) AccessController.doPrivileged(new PrivilegedAction<Object>() {
+            @Override
+            public Object run() {
+                try {
+                    Field f = Unsafe.class.getDeclaredField("theUnsafe");
+                    f.setAccessible(true);
+                    return f.get(null);
+                } catch (Throwable e) {
+                    LOG.warn("sun.misc.Unsafe is not accessible", e);
+                }
+                return null;
+            }
+        });
+
+        if (theUnsafe != null) {
+            BYTE_ARRAY_BASE_OFFSET = theUnsafe.arrayBaseOffset(byte[].class);
+            try {
+                // Using java.nio.Bits#unaligned() to check for unaligned-access capability
+                Class<?> clazz = Class.forName("java.nio.Bits");
+                Method m = clazz.getDeclaredMethod("unaligned");
+                m.setAccessible(true);
+                unaligned = (boolean) m.invoke(null);
+            } catch (Exception e) {
+                unaligned = false;
+            }
+        } else {
+            BYTE_ARRAY_BASE_OFFSET = -1;
+        }
+    }
+
+    private UnsafeAccess() {
+    }
+
+    public static boolean isAvailable() {
+        return theUnsafe != null;
+    }
+
+    /**
+     * @return true when running JVM is having sun's Unsafe package available in it and underlying
+     * system having unaligned-access capability.
+     */
+    public static boolean unaligned() {
+        return unaligned;
+    }
+
+    public static final boolean littleEndian = ByteOrder.nativeOrder()
+            .equals(ByteOrder.LITTLE_ENDIAN);
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/storage/src/test/java/org/apache/kylin/storage/hbase/InvertedIndexHBaseTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/hbase/InvertedIndexHBaseTest.java b/storage/src/test/java/org/apache/kylin/storage/hbase/InvertedIndexHBaseTest.java
index f7fcef1..50069a1 100644
--- a/storage/src/test/java/org/apache/kylin/storage/hbase/InvertedIndexHBaseTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/hbase/InvertedIndexHBaseTest.java
@@ -1,115 +1,112 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.storage.hbase;
-
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.common.util.HBaseMetadataTestCase;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.invertedindex.index.RawTableRecord;
-import org.apache.kylin.invertedindex.index.Slice;
-import org.apache.kylin.invertedindex.index.TableRecord;
-import org.apache.kylin.invertedindex.index.TableRecordInfo;
-import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-
-/**
- * @author yangli9
- */
-public class InvertedIndexHBaseTest extends HBaseMetadataTestCase {
-
-    IIInstance ii;
-    IISegment seg;
-    HConnection hconn;
-
-    TableRecordInfo info;
-
-    @Before
-    public void setup() throws Exception {
-        this.createTestMetadata();
-
-        this.ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii");
-        this.seg = ii.getFirstSegment();
-
-        Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
-        hconn = HConnectionManager.createConnection(hconf);
-
-        this.info = new TableRecordInfo(seg);
-    }
-
-    @After
-    public void after() throws Exception {
-        this.cleanupTestMetadata();
-    }
-
-    @Test
-    public void testLoad() throws Exception {
-
-        String tableName = seg.getStorageLocationIdentifier();
-        IIKeyValueCodec codec = new IIKeyValueCodec(info.getDigest());
-
-        List<Slice> slices = Lists.newArrayList();
-        HBaseClientKVIterator kvIterator = new HBaseClientKVIterator(hconn, tableName, IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES);
-        try {
-            for (Slice slice : codec.decodeKeyValue(kvIterator)) {
-                slices.add(slice);
-            }
-        } finally {
-            kvIterator.close();
-        }
-
-        List<TableRecord> records = iterateRecords(slices);
-        dump(records);
-        System.out.println(records.size() + " records");
-    }
-
-    private List<TableRecord> iterateRecords(List<Slice> slices) {
-        List<TableRecord> records = Lists.newArrayList();
-        for (Slice slice : slices) {
-            for (RawTableRecord rec : slice) {
-                records.add(new TableRecord((RawTableRecord) rec.clone(), info));
-            }
-        }
-        return records;
-    }
-
-    private void dump(Iterable<TableRecord> records) {
-        for (TableRecord rec : records) {
-            System.out.println(rec.toString());
-
-            byte[] x = rec.getBytes();
-            String y = BytesUtil.toReadableText(x);
-            System.out.println(y);
-            System.out.println();
-        }
-    }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.hbase;
+
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.kylin.common.persistence.HBaseConnection;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.invertedindex.index.RawTableRecord;
+import org.apache.kylin.invertedindex.index.Slice;
+import org.apache.kylin.invertedindex.index.TableRecord;
+import org.apache.kylin.invertedindex.index.TableRecordInfo;
+import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+/**
+ * @author yangli9
+ */
+public class InvertedIndexHBaseTest extends HBaseMetadataTestCase {
+
+    IIInstance ii;
+    IISegment seg;
+    Connection hconn;
+
+    TableRecordInfo info;
+
+    @Before
+    public void setup() throws Exception {
+        this.createTestMetadata();
+
+        this.ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii");
+        this.seg = ii.getFirstSegment();
+
+        this.hconn = HBaseConnection.get();
+
+        this.info = new TableRecordInfo(seg);
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void testLoad() throws Exception {
+
+        String tableName = seg.getStorageLocationIdentifier();
+        IIKeyValueCodec codec = new IIKeyValueCodec(info.getDigest());
+
+        List<Slice> slices = Lists.newArrayList();
+        HBaseClientKVIterator kvIterator = new HBaseClientKVIterator(hconn, tableName, IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES);
+        try {
+            for (Slice slice : codec.decodeKeyValue(kvIterator)) {
+                slices.add(slice);
+            }
+        } finally {
+            kvIterator.close();
+        }
+
+        List<TableRecord> records = iterateRecords(slices);
+        dump(records);
+        System.out.println(records.size() + " records");
+    }
+
+    private List<TableRecord> iterateRecords(List<Slice> slices) {
+        List<TableRecord> records = Lists.newArrayList();
+        for (Slice slice : slices) {
+            for (RawTableRecord rec : slice) {
+                records.add(new TableRecord((RawTableRecord) rec.clone(), info));
+            }
+        }
+        return records;
+    }
+
+    private void dump(Iterable<TableRecord> records) {
+        for (TableRecord rec : records) {
+            System.out.println(rec.toString());
+
+            byte[] x = rec.getBytes();
+            String y = BytesUtil.toReadableText(x);
+            System.out.println(y);
+            System.out.println();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java b/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java
index 0454b4c..3ace91e 100644
--- a/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.Type;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.cube.kv.RowConstants;
@@ -223,102 +224,46 @@ public class AggregateRegionObserverTest {
             this.input = cellInputs;
         }
 
-        /*
-         * (non-Javadoc)
-         * 
-         * @see
-         * org.apache.hadoop.hbase.regionserver.InternalScanner#next(java.util
-         * .List)
-         */
         @Override
         public boolean next(List<Cell> results) throws IOException {
             return nextRaw(results);
         }
 
-        /*
-         * (non-Javadoc)
-         * 
-         * @see
-         * org.apache.hadoop.hbase.regionserver.InternalScanner#next(java.util
-         * .List, int)
-         */
         @Override
-        public boolean next(List<Cell> result, int limit) throws IOException {
+        public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
             return next(result);
         }
 
-        /*
-         * (non-Javadoc)
-         * 
-         * @see org.apache.hadoop.hbase.regionserver.InternalScanner#close()
-         */
         @Override
         public void close() throws IOException {
 
         }
 
-        /*
-         * (non-Javadoc)
-         * 
-         * @see
-         * org.apache.hadoop.hbase.regionserver.RegionScanner#getRegionInfo()
-         */
         @Override
         public HRegionInfo getRegionInfo() {
             return null;
         }
 
-        /*
-         * (non-Javadoc)
-         * 
-         * @see
-         * org.apache.hadoop.hbase.regionserver.RegionScanner#isFilterDone()
-         */
         @Override
         public boolean isFilterDone() throws IOException {
             return false;
         }
 
-        /*
-         * (non-Javadoc)
-         * 
-         * @see
-         * org.apache.hadoop.hbase.regionserver.RegionScanner#reseek(byte[])
-         */
         @Override
         public boolean reseek(byte[] row) throws IOException {
             return false;
         }
 
-        /*
-         * (non-Javadoc)
-         * 
-         * @see
-         * org.apache.hadoop.hbase.regionserver.RegionScanner#getMaxResultSize()
-         */
         @Override
         public long getMaxResultSize() {
             return 0;
         }
 
-        /*
-         * (non-Javadoc)
-         * 
-         * @see
-         * org.apache.hadoop.hbase.regionserver.RegionScanner#getMvccReadPoint()
-         */
         @Override
         public long getMvccReadPoint() {
             return 0;
         }
 
-        /*
-         * (non-Javadoc)
-         * 
-         * @see
-         * org.apache.hadoop.hbase.regionserver.RegionScanner#nextRaw(java.util
-         * .List)
-         */
         @Override
         public boolean nextRaw(List<Cell> result) throws IOException {
             if (i < input.size()) {
@@ -328,18 +273,15 @@ public class AggregateRegionObserverTest {
             return i < input.size();
         }
 
-        /*
-         * (non-Javadoc)
-         * 
-         * @see
-         * org.apache.hadoop.hbase.regionserver.RegionScanner#nextRaw(java.util
-         * .List, int)
-         */
         @Override
-        public boolean nextRaw(List<Cell> result, int limit) throws IOException {
+        public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException {
             return nextRaw(result);
         }
 
+        @Override
+        public int getBatch() {
+            return -1;
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/storage/src/test/java/org/apache/kylin/storage/minicluster/HiveMiniClusterTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/minicluster/HiveMiniClusterTest.java b/storage/src/test/java/org/apache/kylin/storage/minicluster/HiveMiniClusterTest.java
index d17cfa6..b1f6626 100644
--- a/storage/src/test/java/org/apache/kylin/storage/minicluster/HiveMiniClusterTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/minicluster/HiveMiniClusterTest.java
@@ -27,7 +27,6 @@ import java.sql.SQLException;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.service.HiveInterface;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MiniMRCluster;
 
@@ -47,7 +46,7 @@ public class HiveMiniClusterTest extends HiveJDBCClientTest {
     public static final File HIVE_WAREHOUSE_DIR = new File(HIVE_BASE_DIR + "/warehouse");
     public static final File HIVE_TESTDATA_DIR = new File(HIVE_BASE_DIR + "/testdata");
     public static final File HIVE_HADOOP_TMP_DIR = new File(HIVE_BASE_DIR + "/hadooptmp");
-    protected HiveInterface client;
+    //protected HiveInterface client;
 
     protected MiniDFSCluster miniDFS;
     protected MiniMRCluster miniMR;


[3/3] kylin git commit: KYLIN-920 & KYLIN-782 $ KYLIN-1422 Upgrade to HBase 1.1 (with help from murkrishn )

Posted by ma...@apache.org.
KYLIN-920 & KYLIN-782 $ KYLIN-1422 Upgrade to HBase 1.1 (with help from murkrishn <mu...@ebay.com>)


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/4313900e
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/4313900e
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/4313900e

Branch: refs/heads/1.x-HBase1.1.3
Commit: 4313900e67076c6d517d29bc0b78095e0e37b4ad
Parents: f6bc652
Author: Yang Li <li...@apache.org>
Authored: Sun Aug 16 20:22:13 2015 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Wed Mar 2 17:15:28 2016 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/KylinConfig.java    |   5 +
 .../common/persistence/HBaseConnection.java     |  31 +-
 .../common/persistence/HBaseResourceStore.java  |  31 +-
 .../common/util/HBaseRegionSizeCalculator.java  |  41 +-
 .../kylin/common/util/BasicHadoopTest.java      |  11 +-
 .../kylin/job/cube/GarbageCollectionStep.java   |  22 +-
 .../kylin/job/hadoop/cube/CubeHFileJob.java     |  22 +-
 .../job/hadoop/cube/StorageCleanupJob.java      |  26 +-
 .../kylin/job/hadoop/hbase/CreateHTableJob.java |   8 +-
 .../hadoop/invertedindex/IICreateHFileJob.java  |  22 +-
 .../hadoop/invertedindex/IICreateHTableJob.java |  11 +-
 .../apache/kylin/job/tools/CleanHtableCLI.java  |   8 +-
 .../kylin/job/tools/CubeMigrationCLI.java       |  67 +-
 .../kylin/job/tools/DeployCoprocessorCLI.java   | 769 ++++++++++---------
 .../job/tools/GridTableHBaseBenchmark.java      |  37 +-
 .../kylin/job/tools/HtableAlterMetadataCLI.java |   8 +-
 .../apache/kylin/job/tools/RowCounterCLI.java   |  11 +-
 .../kylin/job/BuildCubeWithEngineTest.java      |   3 +-
 .../org/apache/kylin/job/ExportHBaseData.java   |  18 +-
 .../kylin/job/hadoop/hbase/TestHbaseClient.java |  13 +-
 .../kylin/job/tools/HBaseRowDigestTest.java     |  11 +-
 monitor/pom.xml                                 |   6 +
 .../kylin/monitor/MonitorMetaManager.java       |  49 +-
 pom.xml                                         |  20 +-
 .../apache/kylin/rest/service/AclService.java   |  38 +-
 .../apache/kylin/rest/service/CubeService.java  |  35 +-
 .../apache/kylin/rest/service/QueryService.java |  21 +-
 .../apache/kylin/rest/service/UserService.java  |  27 +-
 .../storage/filter/BitMapFilterEvaluator.java   |   1 -
 .../storage/hbase/CubeSegmentTupleIterator.java |  50 +-
 .../kylin/storage/hbase/CubeStorageEngine.java  |   4 +-
 .../storage/hbase/HBaseClientKVIterator.java    | 187 ++---
 .../hbase/InvertedIndexStorageEngine.java       | 114 +--
 .../kylin/storage/hbase/PingHBaseCLI.java       | 179 ++---
 .../storage/hbase/RegionScannerAdapter.java     |  10 +-
 .../hbase/SerializedHBaseTupleIterator.java     |   4 +-
 .../endpoint/EndpointTupleIterator.java         |  15 +-
 .../hbase/coprocessor/endpoint/IIEndpoint.java  |   2 +-
 .../observer/AggregateRegionObserver.java       |   2 +-
 .../observer/AggregationScanner.java            |  14 +-
 .../observer/ObserverAggregationCache.java      |  10 +-
 .../coprocessor/observer/ObserverEnabler.java   |   4 +-
 .../storage/hbase/filter/FuzzyRowFilter.java    | 636 +++++++++++++++
 .../storage/hbase/filter/UnsafeAccess.java      |  96 +++
 .../storage/hbase/InvertedIndexHBaseTest.java   | 227 +++---
 .../observer/AggregateRegionObserverTest.java   |  72 +-
 .../minicluster/HiveMiniClusterTest.java        |   3 +-
 47 files changed, 1877 insertions(+), 1124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/common/src/main/java/org/apache/kylin/common/KylinConfig.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/KylinConfig.java b/common/src/main/java/org/apache/kylin/common/KylinConfig.java
index c58d419..c5f4bfe 100644
--- a/common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -530,6 +530,11 @@ public class KylinConfig {
     public int getHBaseScanMaxResultSize() {
         return Integer.parseInt(this.getOptional("kylin.hbase.scan.max_result_size", "" + (5 * 1024 * 1024))); // 5 MB
     }
+    
+    public String getPatchedFuzzyRowFilterVersion()
+    {
+        return this.getOptional("kylin.hbase.filter.fuzzy.row.filter.version","1.1.3");
+    }
 
     public boolean isQueryIgnoreUnknownFunction() {
         return Boolean.parseBoolean(this.getOptional("kylin.query.ignore_unknown_function", "false"));

http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java b/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java
index 5b8fe54..a3d8166 100644
--- a/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java
+++ b/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java
@@ -27,9 +27,10 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,13 +43,13 @@ public class HBaseConnection {
 
     private static final Logger logger = LoggerFactory.getLogger(HBaseConnection.class);
 
-    private static final Map<String, HConnection> ConnPool = new ConcurrentHashMap<String, HConnection>();
+    private static final Map<String, Connection> ConnPool = new ConcurrentHashMap<String, Connection>();
 
     static {
         Runtime.getRuntime().addShutdownHook(new Thread() {
             @Override
             public void run() {
-                for (HConnection conn : ConnPool.values()) {
+                for (Connection conn : ConnPool.values()) {
                     try {
                         conn.close();
                     } catch (IOException e) {
@@ -62,16 +63,20 @@ public class HBaseConnection {
     public static void clearCache() {
         ConnPool.clear();
     }
+    
+    public static Connection get() {
+        return get(KylinConfig.getInstanceFromEnv().getStorageUrl());
+    }
 
-    public static HConnection get(String url) {
+    public static Connection get(String url) {
 
-        HConnection connection = ConnPool.get(url);
+        Connection connection = ConnPool.get(url);
         try {
             // I don't use DCL since recreate a connection is not a big issue.
             if (connection == null) {
                 // find configuration
                 Configuration conf = HadoopUtil.getCurrentHBaseConfiguration();
-                connection = HConnectionManager.createConnection(conf);
+                connection = ConnectionFactory.createConnection(conf);
                 ConnPool.put(url, connection);
             }
         } catch (Throwable t) {
@@ -85,13 +90,13 @@ public class HBaseConnection {
         createHTableIfNeeded(HBaseConnection.get(hbaseUrl), tableName, families);
     }
 
-    public static void createHTableIfNeeded(HConnection conn, String tableName, String... families) throws IOException {
-        HBaseAdmin hbase = new HBaseAdmin(conn);
+    public static void createHTableIfNeeded(Connection conn, String tableName, String... families) throws IOException {
+        Admin admin = conn.getAdmin();
 
         try {
             boolean tableExist = false;
             try {
-                hbase.getTableDescriptor(TableName.valueOf(tableName));
+                admin.getTableDescriptor(TableName.valueOf(tableName));
                 tableExist = true;
             } catch (TableNotFoundException e) {
             }
@@ -112,11 +117,11 @@ public class HBaseConnection {
                     desc.addFamily(fd);
                 }
             }
-            hbase.createTable(desc);
+            admin.createTable(desc);
 
             logger.debug("HTable '" + tableName + "' created");
         } finally {
-            hbase.close();
+            admin.close();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java b/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java
index d1ff27a..0c06847 100644
--- a/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java
+++ b/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java
@@ -34,13 +34,14 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Bytes;
@@ -77,7 +78,7 @@ public class HBaseResourceStore extends ResourceStore {
 
     //    final Map<String, String> tableNameMap; // path prefix ==> HBase table name
 
-    private HConnection getConnection() throws IOException {
+    private Connection getConnection() throws IOException {
         return HBaseConnection.get(hbaseUrl);
     }
 
@@ -114,7 +115,7 @@ public class HBaseResourceStore extends ResourceStore {
 
         ArrayList<String> result = new ArrayList<String>();
 
-        HTableInterface table = getConnection().getTable(getAllInOneTableName());
+        Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
         Scan scan = new Scan(startRow, endRow);
         scan.setFilter(new KeyOnlyFilter());
         try {
@@ -150,7 +151,7 @@ public class HBaseResourceStore extends ResourceStore {
         scan.addColumn(B_FAMILY, B_COLUMN);
         tuneScanParameters(scan);
 
-        HTableInterface table = getConnection().getTable(getAllInOneTableName());
+        Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
         List<RawResource> result = Lists.newArrayList();
         try {
             ResultScanner scanner = table.getScanner(scan);
@@ -219,13 +220,12 @@ public class HBaseResourceStore extends ResourceStore {
         IOUtils.copy(content, bout);
         bout.close();
 
-        HTableInterface table = getConnection().getTable(getAllInOneTableName());
+        Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
         try {
             byte[] row = Bytes.toBytes(resPath);
             Put put = buildPut(resPath, ts, row, bout.toByteArray(), table);
 
             table.put(put);
-            table.flushCommits();
         } finally {
             IOUtils.closeQuietly(table);
         }
@@ -233,7 +233,7 @@ public class HBaseResourceStore extends ResourceStore {
 
     @Override
     protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldTS, long newTS) throws IOException, IllegalStateException {
-        HTableInterface table = getConnection().getTable(getAllInOneTableName());
+        Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
         try {
             byte[] row = Bytes.toBytes(resPath);
             byte[] bOldTS = oldTS == 0 ? null : Bytes.toBytes(oldTS);
@@ -245,8 +245,6 @@ public class HBaseResourceStore extends ResourceStore {
                 throw new IllegalStateException("Overwriting conflict " + resPath + ", expect old TS " + real + ", but it is " + oldTS);
             }
 
-            table.flushCommits();
-
             return newTS;
         } finally {
             IOUtils.closeQuietly(table);
@@ -255,11 +253,10 @@ public class HBaseResourceStore extends ResourceStore {
 
     @Override
     protected void deleteResourceImpl(String resPath) throws IOException {
-        HTableInterface table = getConnection().getTable(getAllInOneTableName());
+        Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
         try {
             Delete del = new Delete(Bytes.toBytes(resPath));
             table.delete(del);
-            table.flushCommits();
         } finally {
             IOUtils.closeQuietly(table);
         }
@@ -284,7 +281,7 @@ public class HBaseResourceStore extends ResourceStore {
                 scan.addColumn(B_FAMILY, B_COLUMN_TS);
         }
 
-        HTableInterface table = getConnection().getTable(getAllInOneTableName());
+        Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName()));
         try {
             ResultScanner scanner = table.getScanner(scan);
             Result result = null;
@@ -303,7 +300,7 @@ public class HBaseResourceStore extends ResourceStore {
         return endRow;
     }
 
-    private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn, HTableInterface table) throws IOException {
+    private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn, Table table) throws IOException {
         Path redirectPath = bigCellHDFSPath(resPath);
         Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
         FileSystem fileSystem = FileSystem.get(hconf);
@@ -329,7 +326,7 @@ public class HBaseResourceStore extends ResourceStore {
         return redirectPath;
     }
 
-    private Put buildPut(String resPath, long ts, byte[] row, byte[] content, HTableInterface table) throws IOException {
+    private Put buildPut(String resPath, long ts, byte[] row, byte[] content, Table table) throws IOException {
         int kvSizeLimit = this.kylinConfig.getHBaseKeyValueSize();
         if (content.length > kvSizeLimit) {
             writeLargeCellToHdfs(resPath, content, table);
@@ -337,8 +334,8 @@ public class HBaseResourceStore extends ResourceStore {
         }
 
         Put put = new Put(row);
-        put.add(B_FAMILY, B_COLUMN, content);
-        put.add(B_FAMILY, B_COLUMN_TS, Bytes.toBytes(ts));
+        put.addColumn(B_FAMILY, B_COLUMN, content);
+        put.addColumn(B_FAMILY, B_COLUMN_TS, Bytes.toBytes(ts));
 
         return put;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/common/src/main/java/org/apache/kylin/common/util/HBaseRegionSizeCalculator.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/HBaseRegionSizeCalculator.java b/common/src/main/java/org/apache/kylin/common/util/HBaseRegionSizeCalculator.java
index fccd042..80f0502 100644
--- a/common/src/main/java/org/apache/kylin/common/util/HBaseRegionSizeCalculator.java
+++ b/common/src/main/java/org/apache/kylin/common/util/HBaseRegionSizeCalculator.java
@@ -23,19 +23,24 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.ClusterStatus;
-import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.RegionLoad;
 import org.apache.hadoop.hbase.ServerLoad;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Table;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,30 +61,31 @@ public class HBaseRegionSizeCalculator {
     /**
      * Computes size of each region for table and given column families.
      * */
-    public HBaseRegionSizeCalculator(HTable table) throws IOException {
-        this(table, new HBaseAdmin(table.getConfiguration()));
-    }
-
-    /** Constructor for unit testing */
-    HBaseRegionSizeCalculator(HTable table, HBaseAdmin hBaseAdmin) throws IOException {
-
+    public HBaseRegionSizeCalculator(String tableName , Connection hbaseConnection) throws IOException {
+        Table table = null;
+        Admin admin = null;
+        
         try {
+            table = hbaseConnection.getTable(TableName.valueOf(tableName));
+            admin = hbaseConnection.getAdmin();
+            
             if (!enabled(table.getConfiguration())) {
                 logger.info("Region size calculation disabled.");
                 return;
             }
 
-            logger.info("Calculating region sizes for table \"" + new String(table.getTableName()) + "\".");
+            logger.info("Calculating region sizes for table \"" + table.getName() + "\".");
 
             // Get regions for table.
-            Set<HRegionInfo> tableRegionInfos = table.getRegionLocations().keySet();
+            RegionLocator regionLocator = hbaseConnection.getRegionLocator(table.getName());
+            List<HRegionLocation> regionLocationList = regionLocator.getAllRegionLocations();
             Set<byte[]> tableRegions = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
 
-            for (HRegionInfo regionInfo : tableRegionInfos) {
-                tableRegions.add(regionInfo.getRegionName());
+            for (HRegionLocation hRegionLocation : regionLocationList) {
+                tableRegions.add(hRegionLocation.getRegionInfo().getRegionName());
             }
 
-            ClusterStatus clusterStatus = hBaseAdmin.getClusterStatus();
+            ClusterStatus clusterStatus = admin.getClusterStatus();
             Collection<ServerName> servers = clusterStatus.getServers();
             final long megaByte = 1024L * 1024L;
 
@@ -103,7 +109,8 @@ public class HBaseRegionSizeCalculator {
                 }
             }
         } finally {
-            hBaseAdmin.close();
+            IOUtils.closeQuietly(table);
+            IOUtils.closeQuietly(admin);
         }
 
     }
@@ -129,7 +136,9 @@ public class HBaseRegionSizeCalculator {
         return Collections.unmodifiableMap(sizeMap);
     }
 
+
     public Map<byte[], Pair<Integer, Integer>> getRegionHFileCountMap() {
         return Collections.unmodifiableMap(countMap);
     }
 }
+

http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/common/src/test/java/org/apache/kylin/common/util/BasicHadoopTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/BasicHadoopTest.java b/common/src/test/java/org/apache/kylin/common/util/BasicHadoopTest.java
index 6d2762c..481fc6c 100644
--- a/common/src/test/java/org/apache/kylin/common/util/BasicHadoopTest.java
+++ b/common/src/test/java/org/apache/kylin/common/util/BasicHadoopTest.java
@@ -21,12 +21,11 @@ package org.apache.kylin.common.util;
 import java.io.File;
 import java.io.IOException;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.kylin.common.persistence.HBaseConnection;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -56,16 +55,14 @@ public class BasicHadoopTest {
         cf.setBlocksize(4 * 1024 * 1024); // set to 4MB
         tableDesc.addFamily(cf);
 
-        Configuration conf = HBaseConfiguration.create();
-        HBaseAdmin admin = new HBaseAdmin(conf);
+        Admin admin = HBaseConnection.get().getAdmin();
         admin.createTable(tableDesc);
         admin.close();
     }
 
     @Test
     public void testRetriveHtableHost() throws IOException {
-        Configuration conf = HBaseConfiguration.create();
-        HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
+        Admin hbaseAdmin = HBaseConnection.get().getAdmin();
         HTableDescriptor[] tableDescriptors = hbaseAdmin.listTables();
         for (HTableDescriptor table : tableDescriptors) {
             String value = table.getValue("KYLIN_HOST");

http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java b/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java
index b076aa7..3852eeb 100644
--- a/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java
+++ b/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java
@@ -24,14 +24,13 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.persistence.HBaseConnection;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.job.cmd.ShellCmdOutput;
 import org.apache.kylin.job.common.HiveCmdBuilder;
@@ -106,19 +105,18 @@ public class GarbageCollectionStep extends AbstractExecutable {
         List<String> oldTables = getOldHTables();
         if (oldTables != null && oldTables.size() > 0) {
             String metadataUrlPrefix = KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix();
-            Configuration conf = HBaseConfiguration.create();
-            HBaseAdmin admin = null;
+            Admin admin = null;
             try {
-                admin = new HBaseAdmin(conf);
+                admin = HBaseConnection.get().getAdmin();
                 for (String table : oldTables) {
-                    if (admin.tableExists(table)) {
-                        HTableDescriptor tableDescriptor = admin.getTableDescriptor(Bytes.toBytes(table));
+                    if (admin.tableExists(TableName.valueOf(table))) {
+                        HTableDescriptor tableDescriptor = admin.getTableDescriptor(TableName.valueOf(table));
                         String host = tableDescriptor.getValue(IRealizationConstants.HTableTag);
                         if (metadataUrlPrefix.equalsIgnoreCase(host)) {
-                            if (admin.isTableEnabled(table)) {
-                                admin.disableTable(table);
+                            if (admin.isTableEnabled(TableName.valueOf(table))) {
+                                admin.disableTable(TableName.valueOf(table));
                             }
-                            admin.deleteTable(table);
+                            admin.deleteTable(TableName.valueOf(table));
                             logger.debug("Dropped HBase table " + table);
                             output.append("Dropped HBase table " + table + " \n");
                         } else {

http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java
index d5b83ef..55d926a 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java
@@ -21,12 +21,16 @@ package org.apache.kylin.job.hadoop.cube;
 import java.io.IOException;
 
 import org.apache.commons.cli.Options;
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
 import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.io.SequenceFile;
@@ -38,6 +42,7 @@ import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.HBaseConnection;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.job.constant.BatchConstants;
@@ -54,6 +59,8 @@ public class CubeHFileJob extends AbstractHadoopJob {
 
     public int run(String[] args) throws Exception {
         Options options = new Options();
+        Connection connection = null;
+        Table table = null;
 
         try {
             options.addOption(OPTION_JOB_NAME);
@@ -90,12 +97,16 @@ public class CubeHFileJob extends AbstractHadoopJob {
             attachKylinPropsAndMetadata(cube, job.getConfiguration());
 
             String tableName = getOptionValue(OPTION_HTABLE_NAME).toUpperCase();
-            HTable htable = new HTable(conf, tableName);
+            connection = HBaseConnection.get();
+            table = connection.getTable(TableName.valueOf(tableName));
+            RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(tableName));
 
-            // Automatic config !
-            HFileOutputFormat.configureIncrementalLoad(job, htable);
+
+            //Automatic config !
+            HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator);
             reconfigurePartitions(conf, partitionFilePath);
 
+
             // set block replication to 3 for hfiles
             conf.set(DFSConfigKeys.DFS_REPLICATION_KEY, "3");
 
@@ -107,6 +118,7 @@ public class CubeHFileJob extends AbstractHadoopJob {
             printUsage(options);
             throw e;
         } finally {
+            IOUtils.closeQuietly(table);
             if (job != null)
                 cleanupTempConfFile(job.getConfiguration());
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java
index 0c87fc6..70705b7 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java
@@ -18,6 +18,13 @@
 
 package org.apache.kylin.job.hadoop.cube;
 
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
@@ -28,10 +35,12 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.HBaseConnection;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
@@ -52,13 +61,6 @@ import org.apache.kylin.metadata.realization.IRealizationConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.StringReader;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
 /**
  * @author ysong1
  */
@@ -109,7 +111,7 @@ public class StorageCleanupJob extends AbstractHadoopJob {
         IIManager iiManager = IIManager.getInstance(KylinConfig.getInstanceFromEnv());
 
         // get all kylin hbase tables
-        HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
+        Admin hbaseAdmin = HBaseConnection.get().getAdmin();
         String tableNamePrefix = IRealizationConstants.SharedHbaseStorageLocationPrefix;
         HTableDescriptor[] tableDescriptors = hbaseAdmin.listTables(tableNamePrefix + ".*");
         List<String> allTablesNeedToBeDropped = new ArrayList<String>();
@@ -143,9 +145,9 @@ public class StorageCleanupJob extends AbstractHadoopJob {
             // drop tables
             for (String htableName : allTablesNeedToBeDropped) {
                 log.info("Deleting HBase table " + htableName);
-                if (hbaseAdmin.tableExists(htableName)) {
-                    hbaseAdmin.disableTable(htableName);
-                    hbaseAdmin.deleteTable(htableName);
+                if (hbaseAdmin.tableExists(TableName.valueOf(htableName))) {
+                    hbaseAdmin.disableTable(TableName.valueOf(htableName));
+                    hbaseAdmin.deleteTable(TableName.valueOf(htableName));
                     log.info("Deleted HBase table " + htableName);
                 } else {
                     log.info("HBase table" + htableName + " does not exist");

http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java
index 027c0ca..9f5e062 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java
@@ -25,11 +25,10 @@ import org.apache.commons.cli.Options;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
@@ -42,6 +41,7 @@ import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.HBaseConnection;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -81,7 +81,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
         tableDesc.setValue(IRealizationConstants.HTableTag, config.getMetadataUrlPrefix());
 
         Configuration conf = HadoopUtil.getCurrentHBaseConfiguration();
-        HBaseAdmin admin = new HBaseAdmin(conf);
+        Admin admin = HBaseConnection.get().getAdmin();
 
         try {
             if (User.isHBaseSecurityEnabled(conf)) {
@@ -139,7 +139,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
 
             byte[][] splitKeys = getSplits(conf, partitionFilePath);
 
-            if (admin.tableExists(tableName)) {
+            if (admin.tableExists(TableName.valueOf(tableName))) {
                 // admin.disableTable(tableName);
                 // admin.deleteTable(tableName);
                 throw new RuntimeException("HBase table " + tableName + " exists!");

http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java
index c032bbc..fa42148 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java
@@ -19,17 +19,20 @@
 package org.apache.kylin.job.hadoop.invertedindex;
 
 import org.apache.commons.cli.Options;
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.HBaseConnection;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.job.hadoop.AbstractHadoopJob;
 import org.slf4j.Logger;
@@ -45,6 +48,8 @@ public class IICreateHFileJob extends AbstractHadoopJob {
 
     public int run(String[] args) throws Exception {
         Options options = new Options();
+        Connection connection = null;
+        Table table = null;
 
         try {
             options.addOption(OPTION_JOB_NAME);
@@ -69,8 +74,11 @@ public class IICreateHFileJob extends AbstractHadoopJob {
             job.setMapOutputValueClass(KeyValue.class);
 
             String tableName = getOptionValue(OPTION_HTABLE_NAME);
-            HTable htable = new HTable(HBaseConfiguration.create(getConf()), tableName);
-            HFileOutputFormat.configureIncrementalLoad(job, htable);
+
+            connection = HBaseConnection.get();
+            table = connection.getTable(TableName.valueOf(tableName));
+            RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(tableName));
+            HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator);
 
             this.deletePath(job.getConfiguration(), output);
 
@@ -78,6 +86,8 @@ public class IICreateHFileJob extends AbstractHadoopJob {
         } catch (Exception e) {
             printUsage(options);
             throw e;
+        } finally {
+            IOUtils.closeQuietly(table);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java
index 32d065a..63777ef 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java
@@ -24,11 +24,12 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.HBaseConnection;
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.invertedindex.IIInstance;
@@ -78,10 +79,10 @@ public class IICreateHTableJob extends AbstractHadoopJob {
             DeployCoprocessorCLI.deployCoprocessor(tableDesc);
 
             // drop the table first
-            HBaseAdmin admin = new HBaseAdmin(conf);
-            if (admin.tableExists(tableName)) {
-                admin.disableTable(tableName);
-                admin.deleteTable(tableName);
+            Admin admin = HBaseConnection.get().getAdmin();
+            if (admin.tableExists(TableName.valueOf(tableName))) {
+                admin.disableTable(TableName.valueOf(tableName));
+                admin.deleteTable(TableName.valueOf(tableName));
             }
 
             // create table

http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/job/src/main/java/org/apache/kylin/job/tools/CleanHtableCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/CleanHtableCLI.java b/job/src/main/java/org/apache/kylin/job/tools/CleanHtableCLI.java
index b6e5af5..7fc1d72 100644
--- a/job/src/main/java/org/apache/kylin/job/tools/CleanHtableCLI.java
+++ b/job/src/main/java/org/apache/kylin/job/tools/CleanHtableCLI.java
@@ -21,11 +21,10 @@ package org.apache.kylin.job.tools;
 import java.io.IOException;
 
 import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.persistence.HBaseConnection;
 import org.apache.kylin.job.hadoop.AbstractHadoopJob;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,8 +53,7 @@ public class CleanHtableCLI extends AbstractHadoopJob {
     }
 
     private void clean() throws IOException {
-        Configuration conf = HBaseConfiguration.create();
-        HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
+        Admin hbaseAdmin = HBaseConnection.get().getAdmin();
 
         for (HTableDescriptor descriptor : hbaseAdmin.listTables()) {
             String name = descriptor.getNameAsString().toLowerCase();

http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java b/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java
index 44bc2c3..4c25d8b 100644
--- a/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java
+++ b/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java
@@ -18,15 +18,31 @@
 
 package org.apache.kylin.job.tools;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.*;
-import org.apache.hadoop.hbase.client.*;
-import org.apache.hadoop.hdfs.web.JsonUtil;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.*;
+import org.apache.kylin.common.persistence.HBaseConnection;
+import org.apache.kylin.common.persistence.JsonSerializer;
+import org.apache.kylin.common.persistence.RawResource;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.Serializer;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -47,11 +63,6 @@ import org.apache.kylin.metadata.realization.RealizationType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
 /**
  * Created by honma on 9/3/14.
  * <p/>
@@ -71,7 +82,7 @@ public class CubeMigrationCLI {
     private static ResourceStore srcStore;
     private static ResourceStore dstStore;
     private static FileSystem hdfsFS;
-    private static HBaseAdmin hbaseAdmin;
+    private static Admin hbaseAdmin;
 
     public static final String ACL_INFO_FAMILY = "i";
     private static final String ACL_TABLE_NAME = "_acl";
@@ -117,8 +128,7 @@ public class CubeMigrationCLI {
 
         checkAndGetHbaseUrl();
 
-        Configuration conf = HBaseConfiguration.create();
-        hbaseAdmin = new HBaseAdmin(conf);
+        hbaseAdmin = HBaseConnection.get().getAdmin();
 
         hdfsFS = FileSystem.get(new Configuration());
 
@@ -141,6 +151,8 @@ public class CubeMigrationCLI {
         } else {
             showOpts();
         }
+
+        IOUtils.closeQuietly(hbaseAdmin);
     }
 
     public static void moveCube(String srcCfgUri, String dstCfgUri, String cubeName, String projectName, String copyAcl, String purgeAndDisable, String overwriteIfExists, String realExecute) throws IOException, InterruptedException {
@@ -304,10 +316,10 @@ public class CubeMigrationCLI {
         case CHANGE_HTABLE_HOST: {
             String tableName = (String) opt.params[0];
             HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
-            hbaseAdmin.disableTable(tableName);
+            hbaseAdmin.disableTable(TableName.valueOf(tableName));
             desc.setValue(IRealizationConstants.HTableTag, dstConfig.getMetadataUrlPrefix());
-            hbaseAdmin.modifyTable(tableName, desc);
-            hbaseAdmin.enableTable(tableName);
+            hbaseAdmin.modifyTable(TableName.valueOf(tableName), desc);
+            hbaseAdmin.enableTable(TableName.valueOf(tableName));
             logger.info("CHANGE_HTABLE_HOST is completed");
             break;
         }
@@ -418,14 +430,14 @@ public class CubeMigrationCLI {
             Serializer<ProjectInstance> projectSerializer = new JsonSerializer<ProjectInstance>(ProjectInstance.class);
             ProjectInstance project = dstStore.getResource(projectResPath, ProjectInstance.class, projectSerializer);
             String projUUID = project.getUuid();
-            HTableInterface srcAclHtable = null;
-            HTableInterface destAclHtable = null;
+            Table srcAclHtable = null;
+            Table destAclHtable = null;
             try {
-                srcAclHtable = HBaseConnection.get(srcConfig.getMetadataUrl()).getTable(srcConfig.getMetadataUrlPrefix() + "_acl");
-                destAclHtable = HBaseConnection.get(dstConfig.getMetadataUrl()).getTable(dstConfig.getMetadataUrlPrefix() + "_acl");
+                srcAclHtable = HBaseConnection.get(srcConfig.getMetadataUrl()).getTable(TableName.valueOf(srcConfig.getMetadataUrlPrefix() + "_acl"));
+                destAclHtable = HBaseConnection.get(dstConfig.getMetadataUrl()).getTable(TableName.valueOf(dstConfig.getMetadataUrlPrefix() + "_acl"));
 
                 // cube acl
-                Result result  = srcAclHtable.get(new Get(Bytes.toBytes(cubeId)));
+                Result result = srcAclHtable.get(new Get(Bytes.toBytes(cubeId)));
                 if (result.listCells() != null) {
                     for (Cell cell : result.listCells()) {
                         byte[] family = CellUtil.cloneFamily(cell);
@@ -438,11 +450,10 @@ public class CubeMigrationCLI {
                             value = Bytes.toBytes(valueString);
                         }
                         Put put = new Put(Bytes.toBytes(cubeId));
-                        put.add(family, column, value);
+                        put.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell), CellUtil.cloneValue(cell));
                         destAclHtable.put(put);
                     }
                 }
-                destAclHtable.flushCommits();
             } finally {
                 IOUtils.closeQuietly(srcAclHtable);
                 IOUtils.closeQuietly(destAclHtable);
@@ -469,10 +480,10 @@ public class CubeMigrationCLI {
         case CHANGE_HTABLE_HOST: {
             String tableName = (String) opt.params[0];
             HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
-            hbaseAdmin.disableTable(tableName);
+            hbaseAdmin.disableTable(TableName.valueOf(tableName));
             desc.setValue(IRealizationConstants.HTableTag, srcConfig.getMetadataUrlPrefix());
-            hbaseAdmin.modifyTable(tableName, desc);
-            hbaseAdmin.enableTable(tableName);
+            hbaseAdmin.modifyTable(TableName.valueOf(tableName), desc);
+            hbaseAdmin.enableTable(TableName.valueOf(tableName));
             break;
         }
         case COPY_FILE_IN_META: {
@@ -502,13 +513,11 @@ public class CubeMigrationCLI {
         case COPY_ACL: {
             String cubeId = (String) opt.params[0];
             String modelId = (String) opt.params[1];
-            HTableInterface destAclHtable = null;
+            Table destAclHtable = null;
             try {
-                destAclHtable = HBaseConnection.get(dstConfig.getMetadataUrl()).getTable(dstConfig.getMetadataUrlPrefix() + "_acl");
-
+                destAclHtable = HBaseConnection.get(dstConfig.getMetadataUrl()).getTable(TableName.valueOf(dstConfig.getMetadataUrlPrefix() + "_acl"));
                 destAclHtable.delete(new Delete(Bytes.toBytes(cubeId)));
                 destAclHtable.delete(new Delete(Bytes.toBytes(modelId)));
-                destAclHtable.flushCommits();
             } finally {
                 IOUtils.closeQuietly(destAclHtable);
             }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java b/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java
index bf655a0..28f52f2 100644
--- a/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java
+++ b/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java
@@ -1,384 +1,385 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.tools;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.regex.Matcher;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-/**
- * @author yangli9
- */
-public class DeployCoprocessorCLI {
-
-    public static final String CubeObserverClassV2 = "org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.AggregateRegionObserver";
-    public static final String CubeEndpointClassV2 = "org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.CubeVisitService";
-    public static final String IIEndpointClassV2 = "org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.IIEndpoint";
-    public static final String OBSERVER_CLS_NAME = "org.apache.kylin.storage.hbase.coprocessor.observer.AggregateRegionObserver";
-    public static final String ENDPOINT_CLS_NAMAE = "org.apache.kylin.storage.hbase.coprocessor.endpoint.IIEndpoint";
-    private static final Logger logger = LoggerFactory.getLogger(DeployCoprocessorCLI.class);
-
-    public static void main(String[] args) throws IOException {
-        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
-        FileSystem fileSystem = FileSystem.get(hconf);
-        HBaseAdmin hbaseAdmin = new HBaseAdmin(hconf);
-
-        String localCoprocessorJar = new File(args[0]).getAbsolutePath();
-        logger.info("Identify coprocessor jar " + localCoprocessorJar);
-
-        List<String> tableNames = getHTableNames(kylinConfig);
-        logger.info("Identify tables " + tableNames);
-
-        if (args.length <= 1) {
-            printUsageAndExit();
-        }
-
-        String filterType = args[1].toLowerCase();
-        if (filterType.equals("-table")) {
-            tableNames = filterByTables(tableNames, Arrays.asList(args).subList(2, args.length));
-        } else if (filterType.equals("-cube")) {
-            tableNames = filterByCubes(tableNames, Arrays.asList(args).subList(2, args.length));
-        } else if (!filterType.equals("all")) {
-            printUsageAndExit();
-        }
-
-        logger.info("Will execute tables " + tableNames);
-
-        Set<String> oldJarPaths = getCoprocessorJarPaths(hbaseAdmin, tableNames);
-        logger.info("Old coprocessor jar: " + oldJarPaths);
-
-        Path hdfsCoprocessorJar = uploadCoprocessorJar(localCoprocessorJar, fileSystem, oldJarPaths);
-        logger.info("New coprocessor jar: " + hdfsCoprocessorJar);
-
-        List<String> processedTables = resetCoprocessorOnHTables(hbaseAdmin, hdfsCoprocessorJar, tableNames);
-
-        // Don't remove old jars, missing coprocessor jar will fail hbase
-        // removeOldJars(oldJarPaths, fileSystem);
-
-        hbaseAdmin.close();
-
-        logger.info("Processed " + processedTables);
-        logger.info("Active coprocessor jar: " + hdfsCoprocessorJar);
-    }
-
-    private static void printUsageAndExit() {
-        logger.warn("Probe run, exiting.");
-        logger.info("Usage: bin/kylin.sh org.apache.kylin.job.tools.DeployCoprocessorCLI JAR_FILE all|-cube CUBE1 CUBE2|-table TABLE1 TABLE2");
-        System.exit(0);
-    }
-
-    private static List<String> filterByCubes(List<String> allTableNames, List<String> cubeNames) {
-        CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
-        List<String> result = Lists.newArrayList();
-        for (String c : cubeNames) {
-            c = c.trim();
-            if (c.endsWith(","))
-                c = c.substring(0, c.length() - 1);
-
-            CubeInstance cubeInstance = cubeManager.getCube(c);
-            for (CubeSegment segment : cubeInstance.getSegments()) {
-                String tableName = segment.getStorageLocationIdentifier();
-                if (allTableNames.contains(tableName)) {
-                    result.add(tableName);
-                }
-            }
-        }
-        return result;
-    }
-
-    private static List<String> filterByTables(List<String> allTableNames, List<String> tableNames) {
-        List<String> result = Lists.newArrayList();
-        for (String t : tableNames) {
-            t = t.trim();
-            if (t.endsWith(","))
-                t = t.substring(0, t.length() - 1);
-
-            if (allTableNames.contains(t)) {
-                result.add(t);
-            }
-        }
-        return result;
-    }
-
-    public static void deployCoprocessor(HTableDescriptor tableDesc) {
-        try {
-            initHTableCoprocessor(tableDesc);
-            logger.info("hbase table " + tableDesc.getName() + " deployed with coprocessor.");
-
-        } catch (Exception ex) {
-            logger.error("Error deploying coprocessor on " + tableDesc.getName(), ex);
-            logger.error("Will try creating the table without coprocessor.");
-        }
-    }
-
-    private static void initHTableCoprocessor(HTableDescriptor desc) throws IOException {
-        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
-        FileSystem fileSystem = FileSystem.get(hconf);
-
-        String localCoprocessorJar = kylinConfig.getCoprocessorLocalJar();
-        Path hdfsCoprocessorJar = DeployCoprocessorCLI.uploadCoprocessorJar(localCoprocessorJar, fileSystem, null);
-
-        DeployCoprocessorCLI.addCoprocessorOnHTable(desc, hdfsCoprocessorJar);
-    }
-
-    public static void addCoprocessorOnHTable(HTableDescriptor desc, Path hdfsCoprocessorJar) throws IOException {
-        logger.info("Add coprocessor on " + desc.getNameAsString());
-        desc.addCoprocessor(ENDPOINT_CLS_NAMAE, hdfsCoprocessorJar, 1000, null);
-        desc.addCoprocessor(OBSERVER_CLS_NAME, hdfsCoprocessorJar, 1001, null);
-    }
-
-    public static void resetCoprocessor(String tableName, HBaseAdmin hbaseAdmin, Path hdfsCoprocessorJar) throws IOException {
-        logger.info("Disable " + tableName);
-        hbaseAdmin.disableTable(tableName);
-
-        logger.info("Unset coprocessor on " + tableName);
-        HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
-
-        // remove coprocessors of 1.x version
-        while (desc.hasCoprocessor(OBSERVER_CLS_NAME)) {
-            desc.removeCoprocessor(OBSERVER_CLS_NAME);
-        }
-        while (desc.hasCoprocessor(ENDPOINT_CLS_NAMAE)) {
-            desc.removeCoprocessor(ENDPOINT_CLS_NAMAE);
-        }
-        // remove coprocessors of 2.x version
-        while (desc.hasCoprocessor(CubeObserverClassV2)) {
-            desc.removeCoprocessor(CubeObserverClassV2);
-        }
-        while (desc.hasCoprocessor(CubeEndpointClassV2)) {
-            desc.removeCoprocessor(CubeEndpointClassV2);
-        }
-        while (desc.hasCoprocessor(IIEndpointClassV2)) {
-            desc.removeCoprocessor(IIEndpointClassV2);
-        }
-
-        addCoprocessorOnHTable(desc, hdfsCoprocessorJar);
-        hbaseAdmin.modifyTable(tableName, desc);
-
-        logger.info("Enable " + tableName);
-        hbaseAdmin.enableTable(tableName);
-    }
-
-    private static List<String> resetCoprocessorOnHTables(HBaseAdmin hbaseAdmin, Path hdfsCoprocessorJar, List<String> tableNames) throws IOException {
-        List<String> processed = new ArrayList<String>();
-
-        for (String tableName : tableNames) {
-            try {
-                resetCoprocessor(tableName, hbaseAdmin, hdfsCoprocessorJar);
-                processed.add(tableName);
-            } catch (IOException ex) {
-                logger.error("Error processing " + tableName, ex);
-            }
-        }
-        return processed;
-    }
-
-    public static Path getNewestCoprocessorJar(KylinConfig config, FileSystem fileSystem) throws IOException {
-        Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, config);
-        FileStatus newestJar = null;
-        for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) {
-            if (fileStatus.getPath().toString().endsWith(".jar")) {
-                if (newestJar == null) {
-                    newestJar = fileStatus;
-                } else {
-                    if (newestJar.getModificationTime() < fileStatus.getModificationTime())
-                        newestJar = fileStatus;
-                }
-            }
-        }
-        if (newestJar == null)
-            return null;
-
-        Path path = newestJar.getPath().makeQualified(fileSystem.getUri(), null);
-        logger.info("The newest coprocessor is " + path.toString());
-        return path;
-    }
-
-    public static Path uploadCoprocessorJar(String localCoprocessorJar, FileSystem fileSystem, Set<String> oldJarPaths) throws IOException {
-        Path uploadPath = null;
-        File localCoprocessorFile = new File(localCoprocessorJar);
-
-        // check existing jars
-        if (oldJarPaths == null) {
-            oldJarPaths = new HashSet<String>();
-        }
-        Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, KylinConfig.getInstanceFromEnv());
-        for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) {
-            if (fileStatus.getLen() == localCoprocessorJar.length() && fileStatus.getModificationTime() == localCoprocessorFile.lastModified()) {
-                uploadPath = fileStatus.getPath();
-                break;
-            }
-            String filename = fileStatus.getPath().toString();
-            if (filename.endsWith(".jar")) {
-                oldJarPaths.add(filename);
-            }
-        }
-
-        // upload if not existing
-        if (uploadPath == null) {
-            // figure out a unique new jar file name
-            Set<String> oldJarNames = new HashSet<String>();
-            for (String path : oldJarPaths) {
-                oldJarNames.add(new Path(path).getName());
-            }
-            String baseName = getBaseFileName(localCoprocessorJar);
-            String newName = null;
-            int i = 0;
-            while (newName == null) {
-                newName = baseName + "-" + (i++) + ".jar";
-                if (oldJarNames.contains(newName))
-                    newName = null;
-            }
-
-            // upload
-            uploadPath = new Path(coprocessorDir, newName);
-            FileInputStream in = null;
-            FSDataOutputStream out = null;
-            try {
-                in = new FileInputStream(localCoprocessorFile);
-                out = fileSystem.create(uploadPath);
-                IOUtils.copy(in, out);
-            } finally {
-                IOUtils.closeQuietly(in);
-                IOUtils.closeQuietly(out);
-            }
-
-            fileSystem.setTimes(uploadPath, localCoprocessorFile.lastModified(), -1);
-
-        }
-
-        uploadPath = uploadPath.makeQualified(fileSystem.getUri(), null);
-        return uploadPath;
-    }
-
-    private static String getBaseFileName(String localCoprocessorJar) {
-        File localJar = new File(localCoprocessorJar);
-        String baseName = localJar.getName();
-        if (baseName.endsWith(".jar"))
-            baseName = baseName.substring(0, baseName.length() - ".jar".length());
-        return baseName;
-    }
-
-    private static Path getCoprocessorHDFSDir(FileSystem fileSystem, KylinConfig config) throws IOException {
-        String hdfsWorkingDirectory = config.getHdfsWorkingDirectory();
-        Path coprocessorDir = new Path(hdfsWorkingDirectory, "coprocessor");
-        fileSystem.mkdirs(coprocessorDir);
-        return coprocessorDir;
-    }
-
-    private static Set<String> getCoprocessorJarPaths(HBaseAdmin hbaseAdmin, List<String> tableNames) throws IOException {
-        HashSet<String> result = new HashSet<String>();
-
-        for (String tableName : tableNames) {
-            HTableDescriptor tableDescriptor = null;
-            try {
-                tableDescriptor = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
-            } catch (TableNotFoundException e) {
-                logger.warn("Table not found " + tableName, e);
-                continue;
-            }
-
-            Matcher keyMatcher;
-            Matcher valueMatcher;
-            for (Map.Entry<ImmutableBytesWritable, ImmutableBytesWritable> e : tableDescriptor.getValues().entrySet()) {
-                keyMatcher = HConstants.CP_HTD_ATTR_KEY_PATTERN.matcher(Bytes.toString(e.getKey().get()));
-                if (!keyMatcher.matches()) {
-                    continue;
-                }
-                valueMatcher = HConstants.CP_HTD_ATTR_VALUE_PATTERN.matcher(Bytes.toString(e.getValue().get()));
-                if (!valueMatcher.matches()) {
-                    continue;
-                }
-
-                String jarPath = valueMatcher.group(1).trim();
-                String clsName = valueMatcher.group(2).trim();
-
-                if (OBSERVER_CLS_NAME.equals(clsName)) {
-                    result.add(jarPath);
-                }
-            }
-        }
-
-        return result;
-    }
-
-    private static List<String> getHTableNames(KylinConfig config) {
-        CubeManager cubeMgr = CubeManager.getInstance(config);
-
-        ArrayList<String> result = new ArrayList<String>();
-        for (CubeInstance cube : cubeMgr.listAllCubes()) {
-            for (CubeSegment seg : cube.getSegments(SegmentStatusEnum.READY)) {
-                String tableName = seg.getStorageLocationIdentifier();
-                if (StringUtils.isBlank(tableName) == false) {
-                    result.add(tableName);
-                    System.out.println("added new table: " + tableName);
-                }
-            }
-        }
-
-        for (IIInstance ii : IIManager.getInstance(config).listAllIIs()) {
-            for (IISegment seg : ii.getSegments(SegmentStatusEnum.READY)) {
-                String tableName = seg.getStorageLocationIdentifier();
-                if (StringUtils.isBlank(tableName) == false) {
-                    result.add(tableName);
-                    System.out.println("added new table: " + tableName);
-                }
-            }
-        }
-
-        return result;
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job.tools;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.HBaseConnection;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+/**
+ * @author yangli9
+ */
+public class DeployCoprocessorCLI {
+
+    public static final String CubeObserverClassV2 = "org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.AggregateRegionObserver";
+    public static final String CubeEndpointClassV2 = "org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.CubeVisitService";
+    public static final String IIEndpointClassV2 = "org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.IIEndpoint";
+    public static final String OBSERVER_CLS_NAME = "org.apache.kylin.storage.hbase.coprocessor.observer.AggregateRegionObserver";
+    public static final String ENDPOINT_CLS_NAMAE = "org.apache.kylin.storage.hbase.coprocessor.endpoint.IIEndpoint";
+    private static final Logger logger = LoggerFactory.getLogger(DeployCoprocessorCLI.class);
+
+    public static void main(String[] args) throws IOException {
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
+        FileSystem fileSystem = FileSystem.get(hconf);
+        Admin hbaseAdmin = HBaseConnection.get().getAdmin();
+
+        String localCoprocessorJar = new File(args[0]).getAbsolutePath();
+        logger.info("Identify coprocessor jar " + localCoprocessorJar);
+
+        List<String> tableNames = getHTableNames(kylinConfig);
+        logger.info("Identify tables " + tableNames);
+
+        if (args.length <= 1) {
+            printUsageAndExit();
+        }
+
+        String filterType = args[1].toLowerCase();
+        if (filterType.equals("-table")) {
+            tableNames = filterByTables(tableNames, Arrays.asList(args).subList(2, args.length));
+        } else if (filterType.equals("-cube")) {
+            tableNames = filterByCubes(tableNames, Arrays.asList(args).subList(2, args.length));
+        } else if (!filterType.equals("all")) {
+            printUsageAndExit();
+        }
+
+        logger.info("Will execute tables " + tableNames);
+
+        Set<String> oldJarPaths = getCoprocessorJarPaths(hbaseAdmin, tableNames);
+        logger.info("Old coprocessor jar: " + oldJarPaths);
+
+        Path hdfsCoprocessorJar = uploadCoprocessorJar(localCoprocessorJar, fileSystem, oldJarPaths);
+        logger.info("New coprocessor jar: " + hdfsCoprocessorJar);
+
+        List<String> processedTables = resetCoprocessorOnHTables(hbaseAdmin, hdfsCoprocessorJar, tableNames);
+
+        // Don't remove old jars, missing coprocessor jar will fail hbase
+        // removeOldJars(oldJarPaths, fileSystem);
+
+        hbaseAdmin.close();
+
+        logger.info("Processed " + processedTables);
+        logger.info("Active coprocessor jar: " + hdfsCoprocessorJar);
+    }
+
+    private static void printUsageAndExit() {
+        logger.warn("Probe run, exiting.");
+        logger.info("Usage: bin/kylin.sh org.apache.kylin.job.tools.DeployCoprocessorCLI JAR_FILE all|-cube CUBE1 CUBE2|-table TABLE1 TABLE2");
+        System.exit(0);
+    }
+
+    private static List<String> filterByCubes(List<String> allTableNames, List<String> cubeNames) {
+        CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+        List<String> result = Lists.newArrayList();
+        for (String c : cubeNames) {
+            c = c.trim();
+            if (c.endsWith(","))
+                c = c.substring(0, c.length() - 1);
+
+            CubeInstance cubeInstance = cubeManager.getCube(c);
+            for (CubeSegment segment : cubeInstance.getSegments()) {
+                String tableName = segment.getStorageLocationIdentifier();
+                if (allTableNames.contains(tableName)) {
+                    result.add(tableName);
+                }
+            }
+        }
+        return result;
+    }
+
+    private static List<String> filterByTables(List<String> allTableNames, List<String> tableNames) {
+        List<String> result = Lists.newArrayList();
+        for (String t : tableNames) {
+            t = t.trim();
+            if (t.endsWith(","))
+                t = t.substring(0, t.length() - 1);
+
+            if (allTableNames.contains(t)) {
+                result.add(t);
+            }
+        }
+        return result;
+    }
+
+    public static void deployCoprocessor(HTableDescriptor tableDesc) {
+        try {
+            initHTableCoprocessor(tableDesc);
+            logger.info("hbase table " + tableDesc.getTableName() + " deployed with coprocessor.");
+
+        } catch (Exception ex) {
+            logger.error("Error deploying coprocessor on " + tableDesc.getTableName(), ex);
+            logger.error("Will try creating the table without coprocessor.");
+        }
+    }
+
+    private static void initHTableCoprocessor(HTableDescriptor desc) throws IOException {
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
+        FileSystem fileSystem = FileSystem.get(hconf);
+
+        String localCoprocessorJar = kylinConfig.getCoprocessorLocalJar();
+        Path hdfsCoprocessorJar = DeployCoprocessorCLI.uploadCoprocessorJar(localCoprocessorJar, fileSystem, null);
+
+        DeployCoprocessorCLI.addCoprocessorOnHTable(desc, hdfsCoprocessorJar);
+    }
+
+    public static void addCoprocessorOnHTable(HTableDescriptor desc, Path hdfsCoprocessorJar) throws IOException {
+        logger.info("Add coprocessor on " + desc.getNameAsString());
+        desc.addCoprocessor(ENDPOINT_CLS_NAMAE, hdfsCoprocessorJar, 1000, null);
+        desc.addCoprocessor(OBSERVER_CLS_NAME, hdfsCoprocessorJar, 1001, null);
+    }
+
+    public static void resetCoprocessor(String tableName, Admin hbaseAdmin, Path hdfsCoprocessorJar) throws IOException {
+        logger.info("Disable " + tableName);
+        hbaseAdmin.disableTable(TableName.valueOf(tableName));
+
+        logger.info("Unset coprocessor on " + tableName);
+        HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
+
+        // remove coprocessors of 1.x version
+        while (desc.hasCoprocessor(OBSERVER_CLS_NAME)) {
+            desc.removeCoprocessor(OBSERVER_CLS_NAME);
+        }
+        while (desc.hasCoprocessor(ENDPOINT_CLS_NAMAE)) {
+            desc.removeCoprocessor(ENDPOINT_CLS_NAMAE);
+        }
+        // remove coprocessors of 2.x version
+        while (desc.hasCoprocessor(CubeObserverClassV2)) {
+            desc.removeCoprocessor(CubeObserverClassV2);
+        }
+        while (desc.hasCoprocessor(CubeEndpointClassV2)) {
+            desc.removeCoprocessor(CubeEndpointClassV2);
+        }
+        while (desc.hasCoprocessor(IIEndpointClassV2)) {
+            desc.removeCoprocessor(IIEndpointClassV2);
+        }
+
+        addCoprocessorOnHTable(desc, hdfsCoprocessorJar);
+        hbaseAdmin.modifyTable(TableName.valueOf(tableName), desc);
+
+        logger.info("Enable " + tableName);
+        hbaseAdmin.enableTable(TableName.valueOf(tableName));
+    }
+
+    private static List<String> resetCoprocessorOnHTables(Admin hbaseAdmin, Path hdfsCoprocessorJar, List<String> tableNames) throws IOException {
+        List<String> processed = new ArrayList<String>();
+
+        for (String tableName : tableNames) {
+            try {
+                resetCoprocessor(tableName, hbaseAdmin, hdfsCoprocessorJar);
+                processed.add(tableName);
+            } catch (IOException ex) {
+                logger.error("Error processing " + tableName, ex);
+            }
+        }
+        return processed;
+    }
+
+    public static Path getNewestCoprocessorJar(KylinConfig config, FileSystem fileSystem) throws IOException {
+        Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, config);
+        FileStatus newestJar = null;
+        for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) {
+            if (fileStatus.getPath().toString().endsWith(".jar")) {
+                if (newestJar == null) {
+                    newestJar = fileStatus;
+                } else {
+                    if (newestJar.getModificationTime() < fileStatus.getModificationTime())
+                        newestJar = fileStatus;
+                }
+            }
+        }
+        if (newestJar == null)
+            return null;
+
+        Path path = newestJar.getPath().makeQualified(fileSystem.getUri(), null);
+        logger.info("The newest coprocessor is " + path.toString());
+        return path;
+    }
+
+    public static Path uploadCoprocessorJar(String localCoprocessorJar, FileSystem fileSystem, Set<String> oldJarPaths) throws IOException {
+        Path uploadPath = null;
+        File localCoprocessorFile = new File(localCoprocessorJar);
+
+        // check existing jars
+        if (oldJarPaths == null) {
+            oldJarPaths = new HashSet<String>();
+        }
+        Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, KylinConfig.getInstanceFromEnv());
+        for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) {
+            if (fileStatus.getLen() == localCoprocessorJar.length() && fileStatus.getModificationTime() == localCoprocessorFile.lastModified()) {
+                uploadPath = fileStatus.getPath();
+                break;
+            }
+            String filename = fileStatus.getPath().toString();
+            if (filename.endsWith(".jar")) {
+                oldJarPaths.add(filename);
+            }
+        }
+
+        // upload if not existing
+        if (uploadPath == null) {
+            // figure out a unique new jar file name
+            Set<String> oldJarNames = new HashSet<String>();
+            for (String path : oldJarPaths) {
+                oldJarNames.add(new Path(path).getName());
+            }
+            String baseName = getBaseFileName(localCoprocessorJar);
+            String newName = null;
+            int i = 0;
+            while (newName == null) {
+                newName = baseName + "-" + (i++) + ".jar";
+                if (oldJarNames.contains(newName))
+                    newName = null;
+            }
+
+            // upload
+            uploadPath = new Path(coprocessorDir, newName);
+            FileInputStream in = null;
+            FSDataOutputStream out = null;
+            try {
+                in = new FileInputStream(localCoprocessorFile);
+                out = fileSystem.create(uploadPath);
+                IOUtils.copy(in, out);
+            } finally {
+                IOUtils.closeQuietly(in);
+                IOUtils.closeQuietly(out);
+            }
+
+            fileSystem.setTimes(uploadPath, localCoprocessorFile.lastModified(), -1);
+
+        }
+
+        uploadPath = uploadPath.makeQualified(fileSystem.getUri(), null);
+        return uploadPath;
+    }
+
+    private static String getBaseFileName(String localCoprocessorJar) {
+        File localJar = new File(localCoprocessorJar);
+        String baseName = localJar.getName();
+        if (baseName.endsWith(".jar"))
+            baseName = baseName.substring(0, baseName.length() - ".jar".length());
+        return baseName;
+    }
+
+    private static Path getCoprocessorHDFSDir(FileSystem fileSystem, KylinConfig config) throws IOException {
+        String hdfsWorkingDirectory = config.getHdfsWorkingDirectory();
+        Path coprocessorDir = new Path(hdfsWorkingDirectory, "coprocessor");
+        fileSystem.mkdirs(coprocessorDir);
+        return coprocessorDir;
+    }
+
+    private static Set<String> getCoprocessorJarPaths(Admin hbaseAdmin, List<String> tableNames) throws IOException {
+        HashSet<String> result = new HashSet<String>();
+
+        for (String tableName : tableNames) {
+            HTableDescriptor tableDescriptor = null;
+            try {
+                tableDescriptor = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
+            } catch (TableNotFoundException e) {
+                logger.warn("Table not found " + tableName, e);
+                continue;
+            }
+
+            Matcher keyMatcher;
+            Matcher valueMatcher;
+            for (Map.Entry<ImmutableBytesWritable, ImmutableBytesWritable> e : tableDescriptor.getValues().entrySet()) {
+                keyMatcher = HConstants.CP_HTD_ATTR_KEY_PATTERN.matcher(Bytes.toString(e.getKey().get()));
+                if (!keyMatcher.matches()) {
+                    continue;
+                }
+                valueMatcher = HConstants.CP_HTD_ATTR_VALUE_PATTERN.matcher(Bytes.toString(e.getValue().get()));
+                if (!valueMatcher.matches()) {
+                    continue;
+                }
+
+                String jarPath = valueMatcher.group(1).trim();
+                String clsName = valueMatcher.group(2).trim();
+
+                if (OBSERVER_CLS_NAME.equals(clsName)) {
+                    result.add(jarPath);
+                }
+            }
+        }
+
+        return result;
+    }
+
+    private static List<String> getHTableNames(KylinConfig config) {
+        CubeManager cubeMgr = CubeManager.getInstance(config);
+
+        ArrayList<String> result = new ArrayList<String>();
+        for (CubeInstance cube : cubeMgr.listAllCubes()) {
+            for (CubeSegment seg : cube.getSegments(SegmentStatusEnum.READY)) {
+                String tableName = seg.getStorageLocationIdentifier();
+                if (StringUtils.isBlank(tableName) == false) {
+                    result.add(tableName);
+                    System.out.println("added new table: " + tableName);
+                }
+            }
+        }
+
+        for (IIInstance ii : IIManager.getInstance(config).listAllIIs()) {
+            for (IISegment seg : ii.getSegments(SegmentStatusEnum.READY)) {
+                String tableName = seg.getStorageLocationIdentifier();
+                if (StringUtils.isBlank(tableName) == false) {
+                    result.add(tableName);
+                    System.out.println("added new table: " + tableName);
+                }
+            }
+        }
+
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/job/src/main/java/org/apache/kylin/job/tools/GridTableHBaseBenchmark.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/GridTableHBaseBenchmark.java b/job/src/main/java/org/apache/kylin/job/tools/GridTableHBaseBenchmark.java
index 70e1df6..5fe5e58 100644
--- a/job/src/main/java/org/apache/kylin/job/tools/GridTableHBaseBenchmark.java
+++ b/job/src/main/java/org/apache/kylin/job/tools/GridTableHBaseBenchmark.java
@@ -28,13 +28,13 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
 import org.apache.kylin.common.persistence.HBaseConnection;
 import org.apache.kylin.common.util.Bytes;
@@ -74,8 +74,7 @@ public class GridTableHBaseBenchmark {
     public static void testGridTable(double hitRatio, double indexRatio) throws IOException {
         System.out.println("Testing grid table scanning, hit ratio " + hitRatio + ", index ratio " + indexRatio);
         String hbaseUrl = "hbase"; // use hbase-site.xml on classpath
-
-        HConnection conn = HBaseConnection.get(hbaseUrl);
+        Connection conn = HBaseConnection.get(hbaseUrl);
         createHTableIfNeeded(conn, TEST_TABLE);
         prepareData(conn);
 
@@ -91,10 +90,10 @@ public class GridTableHBaseBenchmark {
 
     }
 
-    private static void testColumnScan(HConnection conn, List<Pair<Integer, Integer>> colScans) throws IOException {
+    private static void testColumnScan(Connection conn, List<Pair<Integer, Integer>> colScans) throws IOException {
         Stats stats = new Stats("COLUMN_SCAN");
 
-        HTableInterface table = conn.getTable(TEST_TABLE);
+        Table table = conn.getTable(TableName.valueOf(TEST_TABLE));
         try {
             stats.markStart();
 
@@ -122,20 +121,20 @@ public class GridTableHBaseBenchmark {
         }
     }
 
-    private static void testRowScanNoIndexFullScan(HConnection conn, boolean[] hits) throws IOException {
+    private static void testRowScanNoIndexFullScan(Connection conn, boolean[] hits) throws IOException {
         fullScan(conn, hits, new Stats("ROW_SCAN_NO_IDX_FULL"));
     }
 
-    private static void testRowScanNoIndexSkipScan(HConnection conn, boolean[] hits) throws IOException {
+    private static void testRowScanNoIndexSkipScan(Connection conn, boolean[] hits) throws IOException {
         jumpScan(conn, hits, new Stats("ROW_SCAN_NO_IDX_SKIP"));
     }
 
-    private static void testRowScanWithIndex(HConnection conn, boolean[] hits) throws IOException {
+    private static void testRowScanWithIndex(Connection conn, boolean[] hits) throws IOException {
         jumpScan(conn, hits, new Stats("ROW_SCAN_IDX"));
     }
 
-    private static void fullScan(HConnection conn, boolean[] hits, Stats stats) throws IOException {
-        HTableInterface table = conn.getTable(TEST_TABLE);
+    private static void fullScan(Connection conn, boolean[] hits, Stats stats) throws IOException {
+        Table table = conn.getTable(TableName.valueOf(TEST_TABLE));
         try {
             stats.markStart();
 
@@ -156,11 +155,11 @@ public class GridTableHBaseBenchmark {
         }
     }
 
-    private static void jumpScan(HConnection conn, boolean[] hits, Stats stats) throws IOException {
+    private static void jumpScan(Connection conn, boolean[] hits, Stats stats) throws IOException {
 
         final int jumpThreshold = 6; // compensate for Scan() overhead, totally by experience
 
-        HTableInterface table = conn.getTable(TEST_TABLE);
+        Table table = conn.getTable(TableName.valueOf(TEST_TABLE));
         try {
 
             stats.markStart();
@@ -204,8 +203,8 @@ public class GridTableHBaseBenchmark {
         }
     }
 
-    private static void prepareData(HConnection conn) throws IOException {
-        HTableInterface table = conn.getTable(TEST_TABLE);
+    private static void prepareData(Connection conn) throws IOException {
+        Table table = conn.getTable(TableName.valueOf(TEST_TABLE));
 
         try {
             // check how many rows existing
@@ -232,7 +231,7 @@ public class GridTableHBaseBenchmark {
                 byte[] rowkey = Bytes.toBytes(i);
                 Put put = new Put(rowkey);
                 byte[] cell = randomBytes();
-                put.add(CF, QN, cell);
+                put.addColumn(CF, QN, cell);
                 table.put(put);
                 nBytes += cell.length;
                 dot(i, N_ROWS);
@@ -258,8 +257,8 @@ public class GridTableHBaseBenchmark {
         return bytes;
     }
 
-    private static void createHTableIfNeeded(HConnection conn, String tableName) throws IOException {
-        HBaseAdmin hbase = new HBaseAdmin(conn);
+    private static void createHTableIfNeeded(Connection conn, String tableName) throws IOException {
+        Admin hbase = conn.getAdmin();
 
         try {
             boolean tableExist = false;

http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/job/src/main/java/org/apache/kylin/job/tools/HtableAlterMetadataCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/HtableAlterMetadataCLI.java b/job/src/main/java/org/apache/kylin/job/tools/HtableAlterMetadataCLI.java
index 53930e3..e283748 100644
--- a/job/src/main/java/org/apache/kylin/job/tools/HtableAlterMetadataCLI.java
+++ b/job/src/main/java/org/apache/kylin/job/tools/HtableAlterMetadataCLI.java
@@ -23,12 +23,11 @@ import java.io.IOException;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.persistence.HBaseConnection;
 import org.apache.kylin.job.hadoop.AbstractHadoopJob;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -71,8 +70,7 @@ public class HtableAlterMetadataCLI extends AbstractHadoopJob {
     }
 
     private void alter() throws IOException {
-        Configuration conf = HBaseConfiguration.create();
-        HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
+        Admin hbaseAdmin = HBaseConnection.get().getAdmin();
         HTableDescriptor table = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
 
         hbaseAdmin.disableTable(table.getTableName());


[2/3] kylin git commit: KYLIN-920 & KYLIN-782 $ KYLIN-1422 Upgrade to HBase 1.1 (with help from murkrishn )

Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/job/src/main/java/org/apache/kylin/job/tools/RowCounterCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/RowCounterCLI.java b/job/src/main/java/org/apache/kylin/job/tools/RowCounterCLI.java
index 3329d27..4d44088 100644
--- a/job/src/main/java/org/apache/kylin/job/tools/RowCounterCLI.java
+++ b/job/src/main/java/org/apache/kylin/job/tools/RowCounterCLI.java
@@ -22,11 +22,12 @@ import java.util.Iterator;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.BytesUtil;
 import org.slf4j.Logger;
@@ -69,8 +70,8 @@ public class RowCounterCLI {
 
         logger.info("My Scan " + scan.toString());
 
-        HConnection conn = HConnectionManager.createConnection(conf);
-        HTableInterface tableInterface = conn.getTable(htableName);
+        Connection conn = ConnectionFactory.createConnection(conf);
+        Table tableInterface = conn.getTable(TableName.valueOf(htableName));
 
         Iterator<Result> iterator = tableInterface.getScanner(scan).iterator();
         int counter = 0;

http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java b/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
index 78e6ab3..7421b0b 100644
--- a/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
+++ b/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.lock.ZookeeperJobLock;
+import org.apache.kylin.common.persistence.HBaseConnection;
 import org.apache.kylin.common.util.AbstractKylinTestCase;
 import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.HBaseMetadataTestCase;
@@ -310,7 +311,7 @@ public class BuildCubeWithEngineTest {
         Configuration conf = HadoopUtil.getCurrentHBaseConfiguration();
         String tableName = segment.getStorageLocationIdentifier();
         HTable table = new HTable(conf, tableName);
-        HBaseRegionSizeCalculator cal = new HBaseRegionSizeCalculator(table);
+        HBaseRegionSizeCalculator cal = new HBaseRegionSizeCalculator(tableName, HBaseConnection.get());
         Map<byte[], Long> sizeMap = cal.getRegionSizeMap();
         long totalSize = 0;
         for (Long size : sizeMap.values()) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/job/src/test/java/org/apache/kylin/job/ExportHBaseData.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/ExportHBaseData.java b/job/src/test/java/org/apache/kylin/job/ExportHBaseData.java
index e784a41..95a483d 100644
--- a/job/src/test/java/org/apache/kylin/job/ExportHBaseData.java
+++ b/job/src/test/java/org/apache/kylin/job/ExportHBaseData.java
@@ -22,10 +22,11 @@ import java.io.File;
 import java.io.IOException;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.HBaseConnection;
 import org.apache.kylin.common.util.AbstractKylinTestCase;
@@ -39,7 +40,7 @@ public class ExportHBaseData {
     KylinConfig kylinConfig;
     HTableDescriptor[] allTables;
     Configuration config;
-    HBaseAdmin hbase;
+    Admin admin;
     CliCommandExecutor cli;
     String exportHdfsFolder;
     String exportLocalFolderParent;
@@ -75,12 +76,11 @@ public class ExportHBaseData {
         int cut = metadataUrl.indexOf('@');
         tableNameBase = metadataUrl.substring(0, cut);
         String hbaseUrl = cut < 0 ? metadataUrl : metadataUrl.substring(cut + 1);
-
-        HConnection conn = HBaseConnection.get(hbaseUrl);
+        Connection conn = HBaseConnection.get(hbaseUrl);
         try {
-            hbase = new HBaseAdmin(conn);
-            config = hbase.getConfiguration();
-            allTables = hbase.listTables();
+            admin = conn.getAdmin();
+            config = admin.getConfiguration();
+            allTables = admin.listTables();
         } catch (IOException e) {
             e.printStackTrace();
             throw e;
@@ -89,6 +89,8 @@ public class ExportHBaseData {
 
     public void tearDown() {
 
+        // close hbase admin
+        IOUtils.closeQuietly(admin);
         // cleanup hdfs
         try {
             if (cli != null && exportHdfsFolder != null) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/job/src/test/java/org/apache/kylin/job/hadoop/hbase/TestHbaseClient.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/hbase/TestHbaseClient.java b/job/src/test/java/org/apache/kylin/job/hadoop/hbase/TestHbaseClient.java
index f2b9ed6..5a04d20 100644
--- a/job/src/test/java/org/apache/kylin/job/hadoop/hbase/TestHbaseClient.java
+++ b/job/src/test/java/org/apache/kylin/job/hadoop/hbase/TestHbaseClient.java
@@ -22,8 +22,11 @@ import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.kylin.common.util.Bytes;
 
 /**
@@ -90,13 +93,15 @@ public class TestHbaseClient {
         conf.set("hbase.zookeeper.quorum", "hbase_host");
         conf.set("zookeeper.znode.parent", "/hbase-unsecure");
 
-        HTable table = new HTable(conf, "test1");
+        Connection connection = ConnectionFactory.createConnection(conf);
+        Table table = connection.getTable(TableName.valueOf("test1"));
         Put put = new Put(Bytes.toBytes("row1"));
 
-        put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"), Bytes.toBytes("val1"));
-        put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual2"), Bytes.toBytes("val2"));
+        put.addColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"), Bytes.toBytes("val1"));
+        put.addColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("qual2"), Bytes.toBytes("val2"));
 
         table.put(put);
         table.close();
+        connection.close();
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/job/src/test/java/org/apache/kylin/job/tools/HBaseRowDigestTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/tools/HBaseRowDigestTest.java b/job/src/test/java/org/apache/kylin/job/tools/HBaseRowDigestTest.java
index 9f9c23c..f5f94c8 100644
--- a/job/src/test/java/org/apache/kylin/job/tools/HBaseRowDigestTest.java
+++ b/job/src/test/java/org/apache/kylin/job/tools/HBaseRowDigestTest.java
@@ -23,10 +23,11 @@ import java.io.IOException;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.kylin.common.persistence.HBaseConnection;
 import org.apache.kylin.common.util.BytesUtil;
@@ -60,11 +61,11 @@ public class HBaseRowDigestTest extends HBaseMetadataTestCase {
     @Test
     public static void test() throws IOException {
         String hbaseUrl = "hbase"; // use hbase-site.xml on classpath
-        HConnection conn = null;
-        HTableInterface table = null;
+        Connection conn = null;
+        Table table = null;
         try {
             conn = HBaseConnection.get(hbaseUrl);
-            table = conn.getTable("KYLIN_II_YTYWP3CQGJ");
+            table = conn.getTable(TableName.valueOf("KYLIN_II_YTYWP3CQGJ"));
             ResultScanner scanner = table.getScanner(CF, QN);
             StringBuffer sb = new StringBuffer();
             while (true) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/monitor/pom.xml
----------------------------------------------------------------------
diff --git a/monitor/pom.xml b/monitor/pom.xml
index 820934f..399535d 100644
--- a/monitor/pom.xml
+++ b/monitor/pom.xml
@@ -39,6 +39,12 @@
 
     <dependencies>
         <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-common</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+
+        <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/monitor/src/main/java/org/apache/kylin/monitor/MonitorMetaManager.java
----------------------------------------------------------------------
diff --git a/monitor/src/main/java/org/apache/kylin/monitor/MonitorMetaManager.java b/monitor/src/main/java/org/apache/kylin/monitor/MonitorMetaManager.java
index 97200fc..94b3937 100644
--- a/monitor/src/main/java/org/apache/kylin/monitor/MonitorMetaManager.java
+++ b/monitor/src/main/java/org/apache/kylin/monitor/MonitorMetaManager.java
@@ -20,18 +20,21 @@ package org.apache.kylin.monitor;
 
 import java.io.IOException;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.kylin.common.persistence.HBaseConnection;
 import org.apache.log4j.Logger;
 
 /**
@@ -122,11 +125,10 @@ public class MonitorMetaManager {
     public static String getListWithRowkey(String table, String rowkey) throws IOException {
         Result result = getResultByRowKey(table, rowkey);
         String fileList = null;
-        if (result.list() != null) {
-            for (KeyValue kv : result.list()) {
-                fileList = Bytes.toString(kv.getValue());
+        if (result.listCells() != null) {
+            for (Cell cell : result.listCells()) {
+                fileList = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueOffset());
             }
-
         }
         fileList = fileList == null ? "" : fileList;
         return fileList;
@@ -164,16 +166,20 @@ public class MonitorMetaManager {
      * create table in hbase
      */
     public static void creatTable(String tableName, String[] family) throws Exception {
-        HBaseAdmin admin = new HBaseAdmin(conf);
-        HTableDescriptor desc = new HTableDescriptor(tableName);
-        for (int i = 0; i < family.length; i++) {
-            desc.addFamily(new HColumnDescriptor(family[i]));
-        }
-        if (admin.tableExists(tableName)) {
-            logger.info("table Exists!");
-        } else {
-            admin.createTable(desc);
-            logger.info("create table Success!");
+        Admin admin = HBaseConnection.get().getAdmin();
+        try {
+            HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
+            for (int i = 0; i < family.length; i++) {
+                desc.addFamily(new HColumnDescriptor(family[i]));
+            }
+            if (admin.tableExists(TableName.valueOf(tableName))) {
+                logger.info("table Exists!");
+            } else {
+                admin.createTable(desc);
+                logger.info("create table Success!");
+            }
+        } finally {
+            IOUtils.closeQuietly(admin);
         }
     }
 
@@ -181,13 +187,15 @@ public class MonitorMetaManager {
      * update cell in hbase
      */
     public static void updateData(String tableName, String rowKey, String family, String column, String value) throws IOException {
-        HTable table = new HTable(conf, Bytes.toBytes(tableName));
+        Table table = HBaseConnection.get().getTable(TableName.valueOf(tableName));
         Put put = new Put(rowKey.getBytes());
-        put.add(family.getBytes(), column.getBytes(), value.getBytes());
+        put.addColumn(family.getBytes(), column.getBytes(), value.getBytes());
         try {
             table.put(put);
         } catch (IOException e) {
             e.printStackTrace();
+        } finally {
+            IOUtils.closeQuietly(table);
         }
         logger.info("update table [" + tableName + "]");
         logger.info("rowKey [" + rowKey + "]");
@@ -200,9 +208,10 @@ public class MonitorMetaManager {
      * get result by rowkey
      */
     public static Result getResultByRowKey(String tableName, String rowKey) throws IOException {
-        HTable table = new HTable(conf, Bytes.toBytes(tableName));
+        Table table = HBaseConnection.get().getTable(TableName.valueOf(tableName));
         Get get = new Get(Bytes.toBytes(rowKey));
         Result result = table.get(get);
+        IOUtils.closeQuietly(table);
         return result;
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 793a5f2..acb2acf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -45,12 +45,13 @@
         <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
 
         <!-- Hadoop versions -->
-        <hadoop2.version>2.6.0</hadoop2.version>
-        <yarn.version>2.6.0</yarn.version>
+        <hadoop2.version>2.7.1</hadoop2.version>
+        <yarn.version>2.7.1</yarn.version>
         <zookeeper.version>3.4.6</zookeeper.version>
-        <hive.version>0.14.0</hive.version>
-        <hive-hcatalog.version>0.14.0</hive-hcatalog.version>
-        <hbase-hadoop2.version>0.98.8-hadoop2</hbase-hadoop2.version>
+        <hive.version>1.2.1</hive.version>
+        <hive-hcatalog.version>1.2.1</hive-hcatalog.version>
+        <hbase-hadoop2.version>1.1.1</hbase-hadoop2.version>
+        <curator.version>2.7.1</curator.version>
 
         <!-- Dependency versions -->
         <antlr.version>3.4</antlr.version>
@@ -61,6 +62,7 @@
 
         <!-- Commons -->
         <commons-cli.version>1.2</commons-cli.version>
+        <commons-codec.version>1.4</commons-codec.version>
         <commons-lang.version>2.6</commons-lang.version>
         <commons-lang3.version>3.1</commons-lang3.version>
         <commons-io.version>2.4</commons-io.version>
@@ -90,9 +92,6 @@
         <!-- Calcite Version -->
         <calcite.version>1.6.0</calcite.version>
 
-        <!-- Curator.version Version -->
-        <curator.version>2.6.0</curator.version>
-
         <!-- Sonar -->
         <sonar.java.coveragePlugin>jacoco</sonar.java.coveragePlugin>
         <sonar.dynamicAnalysis>reuseReports</sonar.dynamicAnalysis>
@@ -279,6 +278,11 @@
                 <version>${commons-cli.version}</version>
             </dependency>
             <dependency>
+                <groupId>commons-codec</groupId>
+                <artifactId>commons-codec</artifactId>
+                <version>${commons-codec.version}</version>
+            </dependency>
+            <dependency>
                 <groupId>commons-lang</groupId>
                 <artifactId>commons-lang</artifactId>
                 <version>${commons-lang.version}</version>

http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/server/src/main/java/org/apache/kylin/rest/service/AclService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/AclService.java b/server/src/main/java/org/apache/kylin/rest/service/AclService.java
index ea2a48e..8a1cf6d 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/AclService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/AclService.java
@@ -29,13 +29,14 @@ import java.util.Map;
 import java.util.NavigableMap;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
 import org.apache.kylin.common.KylinConfig;
@@ -130,9 +131,9 @@ public class AclService implements MutableAclService {
     @Override
     public List<ObjectIdentity> findChildren(ObjectIdentity parentIdentity) {
         List<ObjectIdentity> oids = new ArrayList<ObjectIdentity>();
-        HTableInterface htable = null;
+        Table htable = null;
         try {
-            htable = HBaseConnection.get(hbaseUrl).getTable(aclTableName);
+            htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(aclTableName));
 
             Scan scan = new Scan();
             SingleColumnValueFilter parentFilter = new SingleColumnValueFilter(Bytes.toBytes(ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_PARENT_COLUMN), CompareOp.EQUAL, domainObjSerializer.serialize(new DomainObjectInfo(parentIdentity)));
@@ -179,10 +180,10 @@ public class AclService implements MutableAclService {
     @Override
     public Map<ObjectIdentity, Acl> readAclsById(List<ObjectIdentity> oids, List<Sid> sids) throws NotFoundException {
         Map<ObjectIdentity, Acl> aclMaps = new HashMap<ObjectIdentity, Acl>();
-        HTableInterface htable = null;
+        Table htable = null;
         Result result = null;
         try {
-            htable = HBaseConnection.get(hbaseUrl).getTable(aclTableName);
+            htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(aclTableName));
 
             for (ObjectIdentity oid : oids) {
                 result = htable.get(new Get(Bytes.toBytes(String.valueOf(oid.getIdentifier()))));
@@ -231,16 +232,15 @@ public class AclService implements MutableAclService {
         Authentication auth = SecurityContextHolder.getContext().getAuthentication();
         PrincipalSid sid = new PrincipalSid(auth);
 
-        HTableInterface htable = null;
+        Table htable = null;
         try {
-            htable = HBaseConnection.get(hbaseUrl).getTable(aclTableName);
+            htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(aclTableName));
             Put put = new Put(Bytes.toBytes(String.valueOf(objectIdentity.getIdentifier())));
-            put.add(Bytes.toBytes(ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_TYPE_COLUMN), Bytes.toBytes(objectIdentity.getType()));
-            put.add(Bytes.toBytes(ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_OWNER_COLUMN), sidSerializer.serialize(new SidInfo(sid)));
-            put.add(Bytes.toBytes(ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_ENTRY_INHERIT_COLUMN), Bytes.toBytes(true));
+            put.addColumn(Bytes.toBytes(ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_TYPE_COLUMN), Bytes.toBytes(objectIdentity.getType()));
+            put.addColumn(Bytes.toBytes(ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_OWNER_COLUMN), sidSerializer.serialize(new SidInfo(sid)));
+            put.addColumn(Bytes.toBytes(ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_ENTRY_INHERIT_COLUMN), Bytes.toBytes(true));
 
             htable.put(put);
-            htable.flushCommits();
 
             logger.debug("ACL of " + objectIdentity + " created successfully.");
         } catch (IOException e) {
@@ -254,9 +254,9 @@ public class AclService implements MutableAclService {
 
     @Override
     public void deleteAcl(ObjectIdentity objectIdentity, boolean deleteChildren) throws ChildrenExistException {
-        HTableInterface htable = null;
+        Table htable = null;
         try {
-            htable = HBaseConnection.get(hbaseUrl).getTable(aclTableName);
+            htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(aclTableName));
             Delete delete = new Delete(Bytes.toBytes(String.valueOf(objectIdentity.getIdentifier())));
 
             List<ObjectIdentity> children = findChildren(objectIdentity);
@@ -269,7 +269,6 @@ public class AclService implements MutableAclService {
             }
 
             htable.delete(delete);
-            htable.flushCommits();
 
             logger.debug("ACL of " + objectIdentity + " deleted successfully.");
         } catch (IOException e) {
@@ -287,27 +286,26 @@ public class AclService implements MutableAclService {
             throw e;
         }
 
-        HTableInterface htable = null;
+        Table htable = null;
         try {
-            htable = HBaseConnection.get(hbaseUrl).getTable(aclTableName);
+            htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(aclTableName));
             Delete delete = new Delete(Bytes.toBytes(String.valueOf(acl.getObjectIdentity().getIdentifier())));
-            delete.deleteFamily(Bytes.toBytes(ACL_ACES_FAMILY));
+            delete.addFamily(Bytes.toBytes(ACL_ACES_FAMILY));
             htable.delete(delete);
 
             Put put = new Put(Bytes.toBytes(String.valueOf(acl.getObjectIdentity().getIdentifier())));
 
             if (null != acl.getParentAcl()) {
-                put.add(Bytes.toBytes(ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_PARENT_COLUMN), domainObjSerializer.serialize(new DomainObjectInfo(acl.getParentAcl().getObjectIdentity())));
+                put.addColumn(Bytes.toBytes(ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_PARENT_COLUMN), domainObjSerializer.serialize(new DomainObjectInfo(acl.getParentAcl().getObjectIdentity())));
             }
 
             for (AccessControlEntry ace : acl.getEntries()) {
                 AceInfo aceInfo = new AceInfo(ace);
-                put.add(Bytes.toBytes(ACL_ACES_FAMILY), Bytes.toBytes(aceInfo.getSidInfo().getSid()), aceSerializer.serialize(aceInfo));
+                put.addColumn(Bytes.toBytes(ACL_ACES_FAMILY), Bytes.toBytes(aceInfo.getSidInfo().getSid()), aceSerializer.serialize(aceInfo));
             }
 
             if (!put.isEmpty()) {
                 htable.put(put);
-                htable.flushCommits();
 
                 logger.debug("ACL of " + acl.getObjectIdentity() + " updated successfully.");
             }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
index 5020dc3..4578d9f 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -28,9 +28,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.HBaseConnection;
 import org.apache.kylin.common.util.HBaseRegionSizeCalculator;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.cube.CubeInstance;
@@ -408,33 +408,24 @@ public class CubeService extends BasicService {
      * @throws IOException Exception when HTable resource is not closed correctly.
      */
     public HBaseResponse getHTableInfo(String tableName) throws IOException {
-        Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
-        HTable table = null;
+        Connection conn = HBaseConnection.get();
         HBaseResponse hr = null;
         long tableSize = 0;
         int regionCount = 0;
 
-        try {
-            table = new HTable(hconf, tableName);
-
-            HBaseRegionSizeCalculator cal = new HBaseRegionSizeCalculator(table);
-            Map<byte[], Long> sizeMap = cal.getRegionSizeMap();
+        HBaseRegionSizeCalculator cal = new HBaseRegionSizeCalculator(tableName, conn);
+        Map<byte[], Long> sizeMap = cal.getRegionSizeMap();
 
-            for (long s : sizeMap.values()) {
-                tableSize += s;
-            }
+        for (long s : sizeMap.values()) {
+            tableSize += s;
+        }
 
-            regionCount = sizeMap.size();
+        regionCount = sizeMap.size();
 
-            // Set response.
-            hr = new HBaseResponse();
-            hr.setTableSize(tableSize);
-            hr.setRegionCount(regionCount);
-        } finally {
-            if (null != table) {
-                table.close();
-            }
-        }
+        // Set response.
+        hr = new HBaseResponse();
+        hr.setTableSize(tableSize);
+        hr.setRegionCount(regionCount);
 
         return hr;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 764df4b..7d14021 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -42,10 +42,11 @@ import javax.sql.DataSource;
 
 import org.apache.calcite.avatica.ColumnMetaData.Rep;
 import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.common.persistence.HBaseConnection;
@@ -124,14 +125,13 @@ public class QueryService extends BasicService {
         Query[] queryArray = new Query[queries.size()];
 
         byte[] bytes = querySerializer.serialize(queries.toArray(queryArray));
-        HTableInterface htable = null;
+        Table htable = null;
         try {
-            htable = HBaseConnection.get(hbaseUrl).getTable(userTableName);
+            htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName));
             Put put = new Put(Bytes.toBytes(creator));
-            put.add(Bytes.toBytes(USER_QUERY_FAMILY), Bytes.toBytes(USER_QUERY_COLUMN), bytes);
+            put.addColumn(Bytes.toBytes(USER_QUERY_FAMILY), Bytes.toBytes(USER_QUERY_COLUMN), bytes);
 
             htable.put(put);
-            htable.flushCommits();
         } finally {
             IOUtils.closeQuietly(htable);
         }
@@ -157,14 +157,13 @@ public class QueryService extends BasicService {
 
         Query[] queryArray = new Query[queries.size()];
         byte[] bytes = querySerializer.serialize(queries.toArray(queryArray));
-        HTableInterface htable = null;
+        Table htable = null;
         try {
-            htable = HBaseConnection.get(hbaseUrl).getTable(userTableName);
+            htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName));
             Put put = new Put(Bytes.toBytes(creator));
-            put.add(Bytes.toBytes(USER_QUERY_FAMILY), Bytes.toBytes(USER_QUERY_COLUMN), bytes);
+            put.addColumn(Bytes.toBytes(USER_QUERY_FAMILY), Bytes.toBytes(USER_QUERY_COLUMN), bytes);
 
             htable.put(put);
-            htable.flushCommits();
         } finally {
             IOUtils.closeQuietly(htable);
         }
@@ -176,9 +175,9 @@ public class QueryService extends BasicService {
         }
 
         List<Query> queries = new ArrayList<Query>();
-        HTableInterface htable = null;
+        Table htable = null;
         try {
-            htable = HBaseConnection.get(hbaseUrl).getTable(userTableName);
+            htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName));
             Get get = new Get(Bytes.toBytes(creator));
             get.addFamily(Bytes.toBytes(USER_QUERY_FAMILY));
             Result result = htable.get(get);

http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/server/src/main/java/org/apache/kylin/rest/service/UserService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/UserService.java b/server/src/main/java/org/apache/kylin/rest/service/UserService.java
index d665ab9..d03cd55 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/UserService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/UserService.java
@@ -25,13 +25,14 @@ import java.util.Collection;
 import java.util.List;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.HBaseConnection;
 import org.apache.kylin.common.util.Bytes;
@@ -75,9 +76,9 @@ public class UserService implements UserManager {
 
     @Override
     public UserDetails loadUserByUsername(String username) throws UsernameNotFoundException {
-        HTableInterface htable = null;
+        Table htable = null;
         try {
-            htable = HBaseConnection.get(hbaseUrl).getTable(userTableName);
+            htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName));
 
             Get get = new Get(Bytes.toBytes(username));
             get.addFamily(Bytes.toBytes(USER_AUTHORITY_FAMILY));
@@ -106,15 +107,14 @@ public class UserService implements UserManager {
 
     @Override
     public void updateUser(UserDetails user) {
-        HTableInterface htable = null;
+        Table htable = null;
         try {
             byte[] userAuthorities = serialize(user.getAuthorities());
-            htable = HBaseConnection.get(hbaseUrl).getTable(userTableName);
+            htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName));
             Put put = new Put(Bytes.toBytes(user.getUsername()));
-            put.add(Bytes.toBytes(USER_AUTHORITY_FAMILY), Bytes.toBytes(USER_AUTHORITY_COLUMN), userAuthorities);
+            put.addColumn(Bytes.toBytes(USER_AUTHORITY_FAMILY), Bytes.toBytes(USER_AUTHORITY_COLUMN), userAuthorities);
 
             htable.put(put);
-            htable.flushCommits();
         } catch (IOException e) {
             throw new RuntimeException(e.getMessage(), e);
         } finally {
@@ -124,13 +124,12 @@ public class UserService implements UserManager {
 
     @Override
     public void deleteUser(String username) {
-        HTableInterface htable = null;
+        Table htable = null;
         try {
-            htable = HBaseConnection.get(hbaseUrl).getTable(userTableName);
+            htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName));
             Delete delete = new Delete(Bytes.toBytes(username));
 
             htable.delete(delete);
-            htable.flushCommits();
         } catch (IOException e) {
             throw new RuntimeException(e.getMessage(), e);
         } finally {
@@ -145,9 +144,9 @@ public class UserService implements UserManager {
 
     @Override
     public boolean userExists(String username) {
-        HTableInterface htable = null;
+        Table htable = null;
         try {
-            htable = HBaseConnection.get(hbaseUrl).getTable(userTableName);
+            htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName));
             Result result = htable.get(new Get(Bytes.toBytes(username)));
 
             return null != result && !result.isEmpty();
@@ -164,10 +163,10 @@ public class UserService implements UserManager {
         s.addColumn(Bytes.toBytes(USER_AUTHORITY_FAMILY), Bytes.toBytes(USER_AUTHORITY_COLUMN));
 
         List<String> authorities = new ArrayList<String>();
-        HTableInterface htable = null;
+        Table htable = null;
         ResultScanner scanner = null;
         try {
-            htable = HBaseConnection.get(hbaseUrl).getTable(userTableName);
+            htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName));
             scanner = htable.getScanner(s);
 
             for (Result result = scanner.next(); result != null; result = scanner.next()) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/storage/src/main/java/org/apache/kylin/storage/filter/BitMapFilterEvaluator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/filter/BitMapFilterEvaluator.java b/storage/src/main/java/org/apache/kylin/storage/filter/BitMapFilterEvaluator.java
index f529145..4aeb676 100644
--- a/storage/src/main/java/org/apache/kylin/storage/filter/BitMapFilterEvaluator.java
+++ b/storage/src/main/java/org/apache/kylin/storage/filter/BitMapFilterEvaluator.java
@@ -25,7 +25,6 @@ import org.apache.kylin.metadata.filter.CompareTupleFilter;
 import org.apache.kylin.metadata.filter.LogicalTupleFilter;
 import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.model.TblColRef;
-
 import org.roaringbitmap.RoaringBitmap;
 
 /**

http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java
index 26c4f29..fcbd3d3 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java
@@ -28,16 +28,14 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NoSuchElementException;
 
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.filter.FilterList;
-import org.apache.hadoop.hbase.filter.FuzzyRowFilter;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.StorageException;
 import org.apache.kylin.common.util.Array;
@@ -70,7 +68,6 @@ import com.google.common.collect.Maps;
 
 /**
  * @author xjiang
- * 
  */
 public class CubeSegmentTupleIterator implements ITupleIterator {
 
@@ -84,7 +81,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
     private final List<RowValueDecoder> rowValueDecoders;
     private final StorageContext context;
     private final String tableName;
-    private final HTableInterface table;
+    private final Table table;
     private final RowKeyDecoder rowKeyDecoder;
     private final Iterator<HBaseKeyRange> rangeIterator;
 
@@ -105,7 +102,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
     private int advMeasureRowsRemaining;
     private int advMeasureRowIndex;
 
-    public CubeSegmentTupleIterator(CubeSegment cubeSeg, List<HBaseKeyRange> keyRanges, HConnection conn, Collection<TblColRef> dimensions, TupleFilter filter, Collection<TblColRef> groupBy, List<RowValueDecoder> rowValueDecoders, StorageContext context) {
+    public CubeSegmentTupleIterator(CubeSegment cubeSeg, List<HBaseKeyRange> keyRanges, Connection conn, Collection<TblColRef> dimensions, TupleFilter filter, Collection<TblColRef> groupBy, List<RowValueDecoder> rowValueDecoders, StorageContext context) {
         this.cube = cubeSeg.getCubeInstance();
         this.cubeSeg = cubeSeg;
         this.dimensions = dimensions;
@@ -115,7 +112,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
         this.context = context;
         this.tableName = cubeSeg.getStorageLocationIdentifier();
         this.rowKeyDecoder = new RowKeyDecoder(this.cubeSeg);
-        
+
         measureTypes = Lists.newArrayList();
         advMeasureFillers = Lists.newArrayListWithCapacity(1);
         advMeasureIndexInRV = Lists.newArrayListWithCapacity(1);
@@ -127,7 +124,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
 
 
         try {
-            this.table = conn.getTable(tableName);
+            this.table = conn.getTable(TableName.valueOf(tableName));
         } catch (Throwable t) {
             throw new StorageException("Error when open connection to table " + tableName, t);
         }
@@ -144,9 +141,8 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
 
         if (logger.isDebugEnabled() && scan != null) {
             logger.debug("Scan " + scan.toString());
-            byte[] metricsBytes = scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA);
-            if (metricsBytes != null) {
-                ScanMetrics scanMetrics = ProtobufUtil.toScanMetrics(metricsBytes);
+            ScanMetrics scanMetrics = scan.getScanMetrics();
+            if (scanMetrics != null) {
                 logger.debug("HBase Metrics: " + "count={}, ms={}, bytes={}, remote_bytes={}, regions={}, not_serving_region={}, rpc={}, rpc_retries={}, remote_rpc={}, remote_rpc_retries={}", new Object[] { scanCount, scanMetrics.sumOfMillisSecBetweenNexts, scanMetrics.countOfBytesInResults, scanMetrics.countOfBytesInRemoteResults, scanMetrics.countOfRegions, scanMetrics.countOfNSRE, scanMetrics.countOfRPCcalls, scanMetrics.countOfRPCRetries, scanMetrics.countOfRemoteRPCcalls, scanMetrics.countOfRemoteRPCRetries });
             }
         }
@@ -303,7 +299,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
     private Scan buildScan(HBaseKeyRange keyRange) {
         Scan scan = new Scan();
         tuneScanParameters(scan);
-        scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE));
+        scan.setScanMetricsEnabled(true);
         for (RowValueDecoder valueDecoder : this.rowValueDecoders) {
             HBaseColumnDesc hbaseColumn = valueDecoder.getHBaseColumn();
             byte[] byteFamily = Bytes.toBytes(hbaseColumn.getColumnFamilyName());
@@ -317,7 +313,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
 
     private void tuneScanParameters(Scan scan) {
         KylinConfig config = KylinConfig.getInstanceFromEnv();
-        
+
         scan.setCaching(config.getHBaseScanCacheRows());
         scan.setMaxResultSize(config.getHBaseScanMaxResultSize());
         scan.setCacheBlocks(true);
@@ -329,19 +325,27 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
     }
 
     private void applyFuzzyFilter(Scan scan, HBaseKeyRange keyRange) {
-        List<Pair<byte[], byte[]>> fuzzyKeys = keyRange.getFuzzyKeys();
+
+        List<org.apache.kylin.common.util.Pair<byte[], byte[]>> fuzzyKeys = keyRange.getFuzzyKeys();
         if (fuzzyKeys != null && fuzzyKeys.size() > 0) {
-            FuzzyRowFilter rowFilter = new FuzzyRowFilter(convertToHBasePair(fuzzyKeys));
+
+            //https://issues.apache.org/jira/browse/HBASE-13761 introduced a bug in (2.0.0, 0.98.13, 1.0.2, 1.2.0, 1.1.1)
+            //and it was not fixed until https://issues.apache.org/jira/browse/HBASE-14269 (2.0.0, 1.2.0, 1.3.0, 0.98.15, 1.0.3, 1.1.3)
+            //if users' hbase version is withing the not-fixed-yet range, need to use the patched FuzzyRowFilter version
+            String patchedFuzzyRowFilterVersion = KylinConfig.getInstanceFromEnv().getPatchedFuzzyRowFilterVersion();
+            Filter fuzzyFilter = null;
+            if ("1.1.3".equals(patchedFuzzyRowFilterVersion)) {
+                //default behavior for this branch
+                fuzzyFilter = new org.apache.kylin.storage.hbase.filter.FuzzyRowFilter(convertToHBasePair(fuzzyKeys));
+            } else {
+                fuzzyFilter = new org.apache.hadoop.hbase.filter.FuzzyRowFilter(convertToHBasePair(fuzzyKeys));
+            }
 
             Filter filter = scan.getFilter();
             if (filter != null) {
-                // may have existed InclusiveStopFilter, see buildScan
-                FilterList filterList = new FilterList();
-                filterList.addFilter(filter);
-                filterList.addFilter(rowFilter);
-                scan.setFilter(filterList);
+                throw new RuntimeException("Scan filter not empty : " + filter);
             } else {
-                scan.setFilter(rowFilter);
+                scan.setFilter(fuzzyFilter);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java
index 626b784..7510dcd 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java
@@ -32,7 +32,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 
-import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.kylin.common.persistence.HBaseConnection;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.Dictionary;
@@ -142,7 +142,7 @@ public class CubeStorageEngine implements IStorageEngine {
         setCoprocessor(groupsCopD, valueDecoders, context); // enable coprocessor if beneficial
         setLimit(filter, context);
 
-        HConnection conn = HBaseConnection.get(context.getConnUrl());
+        Connection conn = HBaseConnection.get(context.getConnUrl());
         return new SerializedHBaseTupleIterator(conn, scans, cubeInstance, dimensionsD, filterD, groupsCopD, valueDecoders, context);
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseClientKVIterator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseClientKVIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseClientKVIterator.java
index 918fd4b..6a76baa 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseClientKVIterator.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseClientKVIterator.java
@@ -1,93 +1,94 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.storage.hbase;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.invertedindex.model.IIDesc;
-
-/**
- * @author yangli9
- * 
- */
-public class HBaseClientKVIterator implements Iterable<Pair<ImmutableBytesWritable, ImmutableBytesWritable>>, Closeable {
-
-    byte[] family;
-    byte[] qualifier;
-
-    HTableInterface table;
-    ResultScanner scanner;
-    Iterator<Result> iterator;
-
-    public HBaseClientKVIterator(HConnection hconn, String tableName, byte[] family, byte[] qualifier) throws IOException {
-        this.family = family;
-        this.qualifier = qualifier;
-
-        this.table = hconn.getTable(tableName);
-        this.scanner = table.getScanner(family, qualifier);
-        this.iterator = scanner.iterator();
-    }
-
-    @Override
-    public void close() {
-        IOUtils.closeQuietly(scanner);
-        IOUtils.closeQuietly(table);
-    }
-
-    @Override
-    public Iterator<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> iterator() {
-        return new MyIterator();
-    }
-
-    private class MyIterator implements Iterator<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> {
-
-        ImmutableBytesWritable key = new ImmutableBytesWritable();
-        ImmutableBytesWritable value = new ImmutableBytesWritable();
-        Pair<ImmutableBytesWritable, ImmutableBytesWritable> pair = new Pair<ImmutableBytesWritable, ImmutableBytesWritable>(key, value);
-
-        @Override
-        public boolean hasNext() {
-            return iterator.hasNext();
-        }
-
-        @Override
-        public Pair<ImmutableBytesWritable, ImmutableBytesWritable> next() {
-            Result r = iterator.next();
-            Cell c = r.getColumnLatestCell(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES);
-            key.set(c.getRowArray(), c.getRowOffset(), c.getRowLength());
-            value.set(c.getValueArray(), c.getValueOffset(), c.getValueLength());
-            return pair;
-        }
-
-        public void remove() {
-            throw new UnsupportedOperationException();
-        }
-
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.hbase;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.invertedindex.model.IIDesc;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class HBaseClientKVIterator implements Iterable<Pair<ImmutableBytesWritable, ImmutableBytesWritable>>, Closeable {
+
+    byte[] family;
+    byte[] qualifier;
+
+    Table table;
+    ResultScanner scanner;
+    Iterator<Result> iterator;
+
+    public HBaseClientKVIterator(Connection hconn, String tableName, byte[] family, byte[] qualifier) throws IOException {
+        this.family = family;
+        this.qualifier = qualifier;
+
+        this.table = hconn.getTable(TableName.valueOf(tableName));
+        this.scanner = table.getScanner(family, qualifier);
+        this.iterator = scanner.iterator();
+    }
+
+    @Override
+    public void close() {
+        IOUtils.closeQuietly(scanner);
+        IOUtils.closeQuietly(table);
+    }
+
+    @Override
+    public Iterator<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> iterator() {
+        return new MyIterator();
+    }
+
+    private class MyIterator implements Iterator<Pair<ImmutableBytesWritable, ImmutableBytesWritable>> {
+
+        ImmutableBytesWritable key = new ImmutableBytesWritable();
+        ImmutableBytesWritable value = new ImmutableBytesWritable();
+        Pair<ImmutableBytesWritable, ImmutableBytesWritable> pair = new Pair<ImmutableBytesWritable, ImmutableBytesWritable>(key, value);
+
+        @Override
+        public boolean hasNext() {
+            return iterator.hasNext();
+        }
+
+        @Override
+        public Pair<ImmutableBytesWritable, ImmutableBytesWritable> next() {
+            Result r = iterator.next();
+            Cell c = r.getColumnLatestCell(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES);
+            key.set(c.getRowArray(), c.getRowOffset(), c.getRowLength());
+            value.set(c.getValueArray(), c.getValueOffset(), c.getValueLength());
+            return pair;
+        }
+
+        public void remove() {
+            throw new UnsupportedOperationException();
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/storage/src/main/java/org/apache/kylin/storage/hbase/InvertedIndexStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/InvertedIndexStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/hbase/InvertedIndexStorageEngine.java
index afb49c0..e518a4c 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/InvertedIndexStorageEngine.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/InvertedIndexStorageEngine.java
@@ -1,57 +1,57 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.storage.hbase;
-
-import java.util.ArrayList;
-
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.kylin.common.persistence.HBaseConnection;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.metadata.realization.SQLDigest;
-import org.apache.kylin.metadata.tuple.ITupleIterator;
-import org.apache.kylin.storage.IStorageEngine;
-import org.apache.kylin.storage.StorageContext;
-import org.apache.kylin.storage.hbase.coprocessor.endpoint.EndpointTupleIterator;
-
-/**
- * @author yangli9
- */
-public class InvertedIndexStorageEngine implements IStorageEngine {
-
-    private IISegment seg;
-
-    public InvertedIndexStorageEngine(IIInstance ii) {
-        this.seg = ii.getFirstSegment();
-    }
-
-    @Override
-    public ITupleIterator search(StorageContext context, SQLDigest sqlDigest) {
-        String tableName = seg.getStorageLocationIdentifier();
-
-        //HConnection is cached, so need not be closed
-        HConnection conn = HBaseConnection.get(context.getConnUrl());
-        try {
-            return new EndpointTupleIterator(seg, sqlDigest.filter, sqlDigest.groupbyColumns, new ArrayList<>(sqlDigest.aggregations), context, conn);
-        } catch (Throwable e) {
-            e.printStackTrace();
-            throw new IllegalStateException("Error when connecting to II htable " + tableName, e);
-        }
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.hbase;
+
+import java.util.ArrayList;
+
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.kylin.common.persistence.HBaseConnection;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.metadata.realization.SQLDigest;
+import org.apache.kylin.metadata.tuple.ITupleIterator;
+import org.apache.kylin.storage.IStorageEngine;
+import org.apache.kylin.storage.StorageContext;
+import org.apache.kylin.storage.hbase.coprocessor.endpoint.EndpointTupleIterator;
+
+/**
+ * @author yangli9
+ */
+public class InvertedIndexStorageEngine implements IStorageEngine {
+
+    private IISegment seg;
+
+    public InvertedIndexStorageEngine(IIInstance ii) {
+        this.seg = ii.getFirstSegment();
+    }
+
+    @Override
+    public ITupleIterator search(StorageContext context, SQLDigest sqlDigest) {
+        String tableName = seg.getStorageLocationIdentifier();
+
+        // Connection is cached, so need not be closed
+        Connection conn = HBaseConnection.get(context.getConnUrl());
+        try {
+            return new EndpointTupleIterator(seg, sqlDigest.filter, sqlDigest.groupbyColumns, new ArrayList<>(sqlDigest.aggregations), context, conn);
+        } catch (Throwable e) {
+            e.printStackTrace();
+            throw new IllegalStateException("Error when connecting to II htable " + tableName, e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/storage/src/main/java/org/apache/kylin/storage/hbase/PingHBaseCLI.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/PingHBaseCLI.java b/storage/src/main/java/org/apache/kylin/storage/hbase/PingHBaseCLI.java
index d4e8529..4a9c574 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/PingHBaseCLI.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/PingHBaseCLI.java
@@ -1,88 +1,91 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.storage.hbase;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.security.token.TokenUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.HadoopUtil;
-
-/**
- * @author yangli9
- * 
- */
-public class PingHBaseCLI {
-
-    public static void main(String[] args) throws IOException {
-        String hbaseTable = args[0];
-
-        System.out.println("Hello friend.");
-
-        Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
-        if (User.isHBaseSecurityEnabled(hconf)) {
-            try {
-                System.out.println("--------------Getting kerberos credential for user " + UserGroupInformation.getCurrentUser().getUserName());
-                TokenUtil.obtainAndCacheToken(hconf, UserGroupInformation.getCurrentUser());
-            } catch (InterruptedException e) {
-                System.out.println("--------------Error while getting kerberos credential for user " + UserGroupInformation.getCurrentUser().getUserName());
-            }
-        }
-
-        Scan scan = new Scan();
-        int limit = 20;
-
-        HConnection conn = null;
-        HTableInterface table = null;
-        ResultScanner scanner = null;
-        try {
-            conn = HConnectionManager.createConnection(hconf);
-            table = conn.getTable(hbaseTable);
-            scanner = table.getScanner(scan);
-            int count = 0;
-            for (Result r : scanner) {
-                byte[] rowkey = r.getRow();
-                System.out.println(Bytes.toStringBinary(rowkey));
-                count++;
-                if (count == limit)
-                    break;
-            }
-        } finally {
-            if (scanner != null) {
-                scanner.close();
-            }
-            if (table != null) {
-                table.close();
-            }
-            if (conn != null) {
-                conn.close();
-            }
-        }
-
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.hbase;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.token.TokenUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.HadoopUtil;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class PingHBaseCLI {
+
+    public static void main(String[] args) throws IOException, InterruptedException {
+        String hbaseTable = args[0];
+
+        System.out.println("Hello friend.");
+
+        Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration();
+        if (User.isHBaseSecurityEnabled(hconf)) {
+            Connection conn = ConnectionFactory.createConnection(hconf);
+            try {
+                UserProvider userProvider = UserProvider.instantiate(hconf);
+                TokenUtil.obtainAndCacheToken(conn, userProvider.create(UserGroupInformation.getCurrentUser()));
+            } finally {
+                conn.close();
+            }
+        }
+
+        Scan scan = new Scan();
+        int limit = 20;
+
+        Connection conn = null;
+        Table table = null;
+        ResultScanner scanner = null;
+        try {
+            conn = ConnectionFactory.createConnection(hconf);
+            table = conn.getTable(TableName.valueOf(hbaseTable));
+            scanner = table.getScanner(scan);
+            int count = 0;
+            for (Result r : scanner) {
+                byte[] rowkey = r.getRow();
+                System.out.println(Bytes.toStringBinary(rowkey));
+                count++;
+                if (count == limit)
+                    break;
+            }
+        } finally {
+            if (scanner != null) {
+                scanner.close();
+            }
+            if (table != null) {
+                table.close();
+            }
+            if (conn != null) {
+                conn.close();
+            }
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/storage/src/main/java/org/apache/kylin/storage/hbase/RegionScannerAdapter.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/RegionScannerAdapter.java b/storage/src/main/java/org/apache/kylin/storage/hbase/RegionScannerAdapter.java
index e2eeed0..a07cbe4 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/RegionScannerAdapter.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/RegionScannerAdapter.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
 
 /**
  * @author yangli9
@@ -50,7 +51,7 @@ public class RegionScannerAdapter implements RegionScanner {
     }
 
     @Override
-    public boolean next(List<Cell> result, int limit) throws IOException {
+    public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
         return next(result);
     }
 
@@ -60,11 +61,16 @@ public class RegionScannerAdapter implements RegionScanner {
     }
 
     @Override
-    public boolean nextRaw(List<Cell> result, int limit) throws IOException {
+    public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException {
         return next(result);
     }
 
     @Override
+    public int getBatch() {
+        return -1;
+    }
+
+    @Override
     public void close() throws IOException {
         scanner.close();
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java
index d188a44..bbe3397 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java
@@ -24,7 +24,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.kv.RowValueDecoder;
@@ -52,7 +52,7 @@ public class SerializedHBaseTupleIterator implements ITupleIterator {
     private ITupleIterator segmentIterator;
     private int scanCount;
 
-    public SerializedHBaseTupleIterator(HConnection conn, List<HBaseKeyRange> segmentKeyRanges, CubeInstance cube, Collection<TblColRef> dimensions, TupleFilter filter, Collection<TblColRef> groupBy, List<RowValueDecoder> rowValueDecoders, StorageContext context) {
+    public SerializedHBaseTupleIterator(Connection conn, List<HBaseKeyRange> segmentKeyRanges, CubeInstance cube, Collection<TblColRef> dimensions, TupleFilter filter, Collection<TblColRef> groupBy, List<RowValueDecoder> rowValueDecoders, StorageContext context) {
 
         this.context = context;
         int limit = context.getLimit();

http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java
index 8587075..450b1ae 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java
@@ -26,8 +26,9 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
@@ -80,14 +81,14 @@ public class EndpointTupleIterator implements ITupleIterator {
 
     Iterator<List<IIProtos.IIResponse.IIRow>> regionResponsesIterator = null;
     ITupleIterator tupleIterator = null;
-    HTableInterface table = null;
+    Table table = null;
 
     int rowsInAllMetric = 0;
 
-    public EndpointTupleIterator(IISegment segment, TupleFilter rootFilter, Collection<TblColRef> groupBy, List<FunctionDesc> measures, StorageContext context, HConnection conn) throws Throwable {
+    public EndpointTupleIterator(IISegment segment, TupleFilter rootFilter, Collection<TblColRef> groupBy, List<FunctionDesc> measures, StorageContext context, Connection conn) throws Throwable {
 
         String tableName = segment.getStorageLocationIdentifier();
-        table = conn.getTable(tableName);
+        table = conn.getTable(TableName.valueOf(tableName));
         factTableName = segment.getIIDesc().getFactTableName();
 
         if (rootFilter == null) {
@@ -213,7 +214,7 @@ public class EndpointTupleIterator implements ITupleIterator {
     }
 
     //TODO : async callback
-    private Iterator<List<IIProtos.IIResponse.IIRow>> getResults(final IIProtos.IIRequest request, HTableInterface table) throws Throwable {
+    private Iterator<List<IIProtos.IIResponse.IIRow>> getResults(final IIProtos.IIRequest request, Table table) throws Throwable {
         Map<byte[], List<IIProtos.IIResponse.IIRow>> results = table.coprocessorService(IIProtos.RowsService.class, null, null, new Batch.Call<IIProtos.RowsService, List<IIProtos.IIResponse.IIRow>>() {
             public List<IIProtos.IIResponse.IIRow> call(IIProtos.RowsService rowsService) throws IOException {
                 ServerRpcController controller = new ServerRpcController();
@@ -236,7 +237,7 @@ public class EndpointTupleIterator implements ITupleIterator {
         int index = 0;
 
         for (int i = 0; i < columns.size(); i++) {
-            TblColRef column = columns.get(i);
+            //            TblColRef column = columns.get(i);
             //            if (!dimensions.contains(column)) {
             //                continue;
             //            }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java
index a770f55..adf1bf1 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/IIEndpoint.java
@@ -90,7 +90,7 @@ public class IIEndpoint extends IIProtos.RowsService implements Coprocessor, Cop
         RegionScanner innerScanner = null;
         HRegion region = null;
         try {
-            region = env.getRegion();
+            region = (HRegion) env.getRegion();
             innerScanner = region.getScanner(buildScan());
             region.startRegionOperation();
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java
index 2cecd5c..c21ee36 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserver.java
@@ -93,7 +93,7 @@ public class AggregateRegionObserver extends BaseRegionObserver {
         // start/end region operation & sync on scanner is suggested by the
         // javadoc of RegionScanner.nextRaw()
         // FIXME: will the lock still work when a iterator is returned? is it safe? Is readonly attribute helping here? by mhb
-        HRegion region = ctxt.getEnvironment().getRegion();
+        HRegion region = (HRegion) ctxt.getEnvironment().getRegion();
         region.startRegionOperation();
         try {
             synchronized (innerScanner) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java
index 8075bc3..eaa7d20 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregationScanner.java
@@ -25,6 +25,7 @@ import java.util.List;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.kylin.measure.MeasureAggregator;
 import org.apache.kylin.storage.hbase.coprocessor.AggrKey;
 import org.apache.kylin.storage.hbase.coprocessor.CoprocessorFilter;
@@ -93,13 +94,18 @@ public class AggregationScanner implements RegionScanner {
     }
 
     @Override
+    public int getBatch() {
+        return outerScanner.getBatch();
+    }
+
+    @Override
     public boolean next(List<Cell> results) throws IOException {
         return outerScanner.next(results);
     }
 
     @Override
-    public boolean next(List<Cell> result, int limit) throws IOException {
-        return outerScanner.next(result, limit);
+    public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
+        return outerScanner.next(result, scannerContext);
     }
 
     @Override
@@ -108,8 +114,8 @@ public class AggregationScanner implements RegionScanner {
     }
 
     @Override
-    public boolean nextRaw(List<Cell> result, int limit) throws IOException {
-        return outerScanner.nextRaw(result, limit);
+    public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException {
+        return outerScanner.nextRaw(result, scannerContext);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverAggregationCache.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverAggregationCache.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverAggregationCache.java
index f5fb497..b1f642f 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverAggregationCache.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverAggregationCache.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.Type;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.kylin.measure.MeasureAggregator;
 import org.apache.kylin.storage.hbase.coprocessor.AggrKey;
 import org.apache.kylin.storage.hbase.coprocessor.AggregationCache;
@@ -107,7 +108,7 @@ public class ObserverAggregationCache extends AggregationCache {
         }
 
         @Override
-        public boolean next(List<Cell> result, int limit) throws IOException {
+        public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
             return next(result);
         }
 
@@ -117,11 +118,16 @@ public class ObserverAggregationCache extends AggregationCache {
         }
 
         @Override
-        public boolean nextRaw(List<Cell> result, int limit) throws IOException {
+        public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException {
             return next(result);
         }
 
         @Override
+        public int getBatch() {
+            return innerScanner.getBatch();
+        }
+
+        @Override
         public void close() throws IOException {
             // AggregateRegionObserver.LOG.info("Kylin Scanner close()");
             innerScanner.close();

http://git-wip-us.apache.org/repos/asf/kylin/blob/4313900e/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
index 5278326..b941a5e 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
@@ -23,9 +23,9 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.debug.BackdoorToggles;
@@ -58,7 +58,7 @@ public class ObserverEnabler {
     static final Map<String, Boolean> CUBE_OVERRIDES = Maps.newConcurrentMap();
 
     public static ResultScanner scanWithCoprocessorIfBeneficial(CubeSegment segment, Cuboid cuboid, TupleFilter tupleFiler, //
-            Collection<TblColRef> groupBy, Collection<RowValueDecoder> rowValueDecoders, StorageContext context, HTableInterface table, Scan scan) throws IOException {
+            Collection<TblColRef> groupBy, Collection<RowValueDecoder> rowValueDecoders, StorageContext context, Table table, Scan scan) throws IOException {
 
         if (context.isCoprocessorEnabled() == false) {
             return table.getScanner(scan);