You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/10 10:39:06 UTC

[3/3] incubator-kylin git commit: minor, downgrade hbase version to 0.98.4 in pom.xml to exactly match HDP 2.2

minor, downgrade hbase version to 0.98.4 in pom.xml to exactly match HDP 2.2


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/3011f6fd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/3011f6fd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/3011f6fd

Branch: refs/heads/0.8
Commit: 3011f6fd9754e38983b6846e015e8c98729cfd69
Parents: 87c32b0
Author: Li, Yang <ya...@ebay.com>
Authored: Fri Jul 10 16:35:41 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Fri Jul 10 16:38:11 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/util/MockHTable.java    | 690 -------------------
 .../hadoop/cubev2/MergeCuboidFromHBaseJob.java  |  23 +-
 pom.xml                                         |   2 +-
 .../rest/security/MockAclHBaseStorage.java      |   1 -
 .../apache/kylin/rest/security/MockHTable.java  | 690 +++++++++++++++++++
 5 files changed, 695 insertions(+), 711 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3011f6fd/common/src/main/java/org/apache/kylin/common/util/MockHTable.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/MockHTable.java b/common/src/main/java/org/apache/kylin/common/util/MockHTable.java
deleted file mode 100644
index c60a7a5..0000000
--- a/common/src/main/java/org/apache/kylin/common/util/MockHTable.java
+++ /dev/null
@@ -1,690 +0,0 @@
-/**
- * This file is licensed to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- */
-
-package org.apache.kylin.common.util;
-
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-import com.google.protobuf.Service;
-import com.google.protobuf.ServiceException;
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.*;
-import org.apache.hadoop.hbase.client.coprocessor.Batch;
-import org.apache.hadoop.hbase.filter.CompareFilter;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
-import org.apache.hadoop.hbase.util.Bytes;
-
-import java.io.IOException;
-import java.util.*;
-
-/**
- * MockHTable.
- *
- * original MockHTable (by agaoglu) : https://gist.github.com/agaoglu/613217#file_mock_h_table.java
- *
- * Modifications
- *
- * <ul>
- *     <li>fix filter (by k-mack) : https://gist.github.com/k-mack/4600133</li>
- *     <li>fix batch() : implement all mutate operation and fix result[] count.</li>
- *     <li>fix exists()</li>
- *     <li>fix increment() : wrong return value</li>
- *     <li>check columnFamily</li>
- *     <li>implement mutateRow()</li>
- *     <li>implement getTableName()</li>
- *     <li>implement getTableDescriptor()</li>
- *     <li>throws RuntimeException when unimplemented method was called.</li>
- *     <li>remove some methods for loading data, checking values ...</li>
- * </ul>
- */
-public class MockHTable implements HTableInterface {
-    private final String tableName;
-    private final List<String> columnFamilies = new ArrayList<>();
-
-    private NavigableMap<byte[], NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>> data = new TreeMap<>(Bytes.BYTES_COMPARATOR);
-
-    private static List<KeyValue> toKeyValue(byte[] row, NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowdata, int maxVersions) {
-        return toKeyValue(row, rowdata, 0, Long.MAX_VALUE, maxVersions);
-    }
-
-    public MockHTable(String tableName) {
-        this.tableName = tableName;
-    }
-
-    public MockHTable(String tableName, String... columnFamilies) {
-        this.tableName = tableName;
-        this.columnFamilies.addAll(Arrays.asList(columnFamilies));
-    }
-
-    public void addColumnFamily(String columnFamily) {
-        this.columnFamilies.add(columnFamily);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public byte[] getTableName() {
-        return tableName.getBytes();
-    }
-
-    @Override
-    public TableName getName() {
-        return null;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public Configuration getConfiguration() {
-        throw new RuntimeException(this.getClass() + " does NOT implement this method.");
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public HTableDescriptor getTableDescriptor() throws IOException {
-        HTableDescriptor table = new HTableDescriptor(tableName);
-        for (String columnFamily : columnFamilies) {
-            table.addFamily(new HColumnDescriptor(columnFamily));
-        }
-        return table;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void mutateRow(RowMutations rm) throws IOException {
-        // currently only support Put and Delete
-        for (Mutation mutation : rm.getMutations()) {
-            if (mutation instanceof Put) {
-                put((Put) mutation);
-            } else if (mutation instanceof Delete) {
-                delete((Delete) mutation);
-            }
-        }
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public Result append(Append append) throws IOException {
-        throw new RuntimeException(this.getClass() + " does NOT implement this method.");
-    }
-
-    private static List<KeyValue> toKeyValue(byte[] row, NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowdata, long timestampStart, long timestampEnd, int maxVersions) {
-        List<KeyValue> ret = new ArrayList<KeyValue>();
-        for (byte[] family : rowdata.keySet())
-            for (byte[] qualifier : rowdata.get(family).keySet()) {
-                int versionsAdded = 0;
-                for (Map.Entry<Long, byte[]> tsToVal : rowdata.get(family).get(qualifier).descendingMap().entrySet()) {
-                    if (versionsAdded++ == maxVersions)
-                        break;
-                    Long timestamp = tsToVal.getKey();
-                    if (timestamp < timestampStart)
-                        continue;
-                    if (timestamp > timestampEnd)
-                        continue;
-                    byte[] value = tsToVal.getValue();
-                    ret.add(new KeyValue(row, family, qualifier, timestamp, value));
-                }
-            }
-        return ret;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public boolean exists(Get get) throws IOException {
-        Result result = get(get);
-        return result != null && result.isEmpty() == false;
-    }
-
-    @Override
-    public Boolean[] exists(List<Get> gets) throws IOException {
-        return new Boolean[0];
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void batch(List<? extends Row> actions, Object[] results) throws IOException, InterruptedException {
-        results = batch(actions);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public Object[] batch(List<? extends Row> actions) throws IOException, InterruptedException {
-        Object[] results = new Object[actions.size()]; // same size.
-        for (int i = 0; i < actions.size(); i++) {
-            Row r = actions.get(i);
-            if (r instanceof Delete) {
-                delete((Delete) r);
-                results[i] = new Result();
-            }
-            if (r instanceof Put) {
-                put((Put) r);
-                results[i] = new Result();
-            }
-            if (r instanceof Get) {
-                Result result = get((Get) r);
-                results[i] = result;
-            }
-            if (r instanceof Increment) {
-                Result result = increment((Increment) r);
-                results[i] = result;
-            }
-            if (r instanceof Append) {
-                Result result = append((Append) r);
-                results[i] = result;
-            }
-        }
-        return results;
-    }
-
-    @Override
-    public <R> void batchCallback(List<? extends Row> actions, Object[] results, Batch.Callback<R> callback) throws IOException, InterruptedException {
-
-    }
-
-    @Override
-    public <R> Object[] batchCallback(List<? extends Row> actions, Batch.Callback<R> callback) throws IOException, InterruptedException {
-        return new Object[0];
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public Result get(Get get) throws IOException {
-        if (!data.containsKey(get.getRow()))
-            return new Result();
-        byte[] row = get.getRow();
-        List<KeyValue> kvs = new ArrayList<KeyValue>();
-        if (!get.hasFamilies()) {
-            kvs = toKeyValue(row, data.get(row), get.getMaxVersions());
-        } else {
-            for (byte[] family : get.getFamilyMap().keySet()) {
-                if (data.get(row).get(family) == null)
-                    continue;
-                NavigableSet<byte[]> qualifiers = get.getFamilyMap().get(family);
-                if (qualifiers == null || qualifiers.isEmpty())
-                    qualifiers = data.get(row).get(family).navigableKeySet();
-                for (byte[] qualifier : qualifiers) {
-                    if (qualifier == null)
-                        qualifier = "".getBytes();
-                    if (!data.get(row).containsKey(family) || !data.get(row).get(family).containsKey(qualifier) || data.get(row).get(family).get(qualifier).isEmpty())
-                        continue;
-                    Map.Entry<Long, byte[]> timestampAndValue = data.get(row).get(family).get(qualifier).lastEntry();
-                    kvs.add(new KeyValue(row, family, qualifier, timestampAndValue.getKey(), timestampAndValue.getValue()));
-                }
-            }
-        }
-        Filter filter = get.getFilter();
-        if (filter != null) {
-            kvs = filter(filter, kvs);
-        }
-
-        return new Result(kvs);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public Result[] get(List<Get> gets) throws IOException {
-        List<Result> results = new ArrayList<Result>();
-        for (Get g : gets) {
-            results.add(get(g));
-        }
-        return results.toArray(new Result[results.size()]);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public Result getRowOrBefore(byte[] row, byte[] family) throws IOException {
-        // FIXME: implement
-        return null;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public ResultScanner getScanner(Scan scan) throws IOException {
-        final List<Result> ret = new ArrayList<Result>();
-        byte[] st = scan.getStartRow();
-        byte[] sp = scan.getStopRow();
-        Filter filter = scan.getFilter();
-
-        for (byte[] row : data.keySet()) {
-            // if row is equal to startRow emit it. When startRow (inclusive) and
-            // stopRow (exclusive) is the same, it should not be excluded which would
-            // happen w/o this control.
-            if (st != null && st.length > 0 && Bytes.BYTES_COMPARATOR.compare(st, row) != 0) {
-                // if row is before startRow do not emit, pass to next row
-                if (st != null && st.length > 0 && Bytes.BYTES_COMPARATOR.compare(st, row) > 0)
-                    continue;
-                // if row is equal to stopRow or after it do not emit, stop iteration
-                if (sp != null && sp.length > 0 && Bytes.BYTES_COMPARATOR.compare(sp, row) <= 0)
-                    break;
-            }
-
-            List<KeyValue> kvs = null;
-            if (!scan.hasFamilies()) {
-                kvs = toKeyValue(row, data.get(row), scan.getTimeRange().getMin(), scan.getTimeRange().getMax(), scan.getMaxVersions());
-            } else {
-                kvs = new ArrayList<KeyValue>();
-                for (byte[] family : scan.getFamilyMap().keySet()) {
-                    if (data.get(row).get(family) == null)
-                        continue;
-                    NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(family);
-                    if (qualifiers == null || qualifiers.isEmpty())
-                        qualifiers = data.get(row).get(family).navigableKeySet();
-                    for (byte[] qualifier : qualifiers) {
-                        if (data.get(row).get(family).get(qualifier) == null)
-                            continue;
-                        for (Long timestamp : data.get(row).get(family).get(qualifier).descendingKeySet()) {
-                            if (timestamp < scan.getTimeRange().getMin())
-                                continue;
-                            if (timestamp > scan.getTimeRange().getMax())
-                                continue;
-                            byte[] value = data.get(row).get(family).get(qualifier).get(timestamp);
-                            kvs.add(new KeyValue(row, family, qualifier, timestamp, value));
-                            if (kvs.size() == scan.getMaxVersions()) {
-                                break;
-                            }
-                        }
-                    }
-                }
-            }
-            if (filter != null) {
-                kvs = filter(filter, kvs);
-                // Check for early out optimization
-                if (filter.filterAllRemaining()) {
-                    break;
-                }
-            }
-            if (!kvs.isEmpty()) {
-                ret.add(new Result(kvs));
-            }
-        }
-
-        return new ResultScanner() {
-            private final Iterator<Result> iterator = ret.iterator();
-
-            public Iterator<Result> iterator() {
-                return iterator;
-            }
-
-            public Result[] next(int nbRows) throws IOException {
-                ArrayList<Result> resultSets = new ArrayList<Result>(nbRows);
-                for (int i = 0; i < nbRows; i++) {
-                    Result next = next();
-                    if (next != null) {
-                        resultSets.add(next);
-                    } else {
-                        break;
-                    }
-                }
-                return resultSets.toArray(new Result[resultSets.size()]);
-            }
-
-            public Result next() throws IOException {
-                try {
-                    return iterator().next();
-                } catch (NoSuchElementException e) {
-                    return null;
-                }
-            }
-
-            public void close() {
-            }
-        };
-    }
-
-    /**
-     * Follows the logical flow through the filter methods for a single row.
-     *
-     * @param filter HBase filter.
-     * @param kvs    List of a row's KeyValues
-     * @return List of KeyValues that were not filtered.
-     */
-    private List<KeyValue> filter(Filter filter, List<KeyValue> kvs) throws IOException {
-        filter.reset();
-
-        List<KeyValue> tmp = new ArrayList<KeyValue>(kvs.size());
-        tmp.addAll(kvs);
-
-        /*
-         * Note. Filter flow for a single row. Adapted from
-         * "HBase: The Definitive Guide" (p. 163) by Lars George, 2011.
-         * See Figure 4-2 on p. 163.
-         */
-        boolean filteredOnRowKey = false;
-        List<KeyValue> nkvs = new ArrayList<KeyValue>(tmp.size());
-        for (KeyValue kv : tmp) {
-            if (filter.filterRowKey(kv.getBuffer(), kv.getRowOffset(), kv.getRowLength())) {
-                filteredOnRowKey = true;
-                break;
-            }
-            Filter.ReturnCode filterResult = filter.filterKeyValue(kv);
-            if (filterResult == Filter.ReturnCode.INCLUDE) {
-                nkvs.add(kv);
-            } else if (filterResult == Filter.ReturnCode.NEXT_ROW) {
-                break;
-            } else if (filterResult == Filter.ReturnCode.NEXT_COL || filterResult == Filter.ReturnCode.SKIP) {
-                continue;
-            }
-            /*
-             * Ignoring next key hint which is a optimization to reduce file
-             * system IO
-             */
-        }
-        if (filter.hasFilterRow() && !filteredOnRowKey) {
-            filter.filterRow(nkvs);
-        }
-        if (filter.filterRow() || filteredOnRowKey) {
-            nkvs.clear();
-        }
-        tmp = nkvs;
-        return tmp;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public ResultScanner getScanner(byte[] family) throws IOException {
-        Scan scan = new Scan();
-        scan.addFamily(family);
-        return getScanner(scan);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException {
-        Scan scan = new Scan();
-        scan.addColumn(family, qualifier);
-        return getScanner(scan);
-    }
-
-    private <K, V> V forceFind(NavigableMap<K, V> map, K key, V newObject) {
-        V data = map.get(key);
-        if (data == null) {
-            data = newObject;
-            map.put(key, data);
-        }
-        return data;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void put(Put put) throws IOException {
-        byte[] row = put.getRow();
-        NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowData = forceFind(data, row, new TreeMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>(Bytes.BYTES_COMPARATOR));
-        for (byte[] family : put.getFamilyMap().keySet()) {
-            if (columnFamilies.contains(new String(family)) == false) {
-                throw new RuntimeException("Not Exists columnFamily : " + new String(family));
-            }
-            NavigableMap<byte[], NavigableMap<Long, byte[]>> familyData = forceFind(rowData, family, new TreeMap<byte[], NavigableMap<Long, byte[]>>(Bytes.BYTES_COMPARATOR));
-            for (KeyValue kv : put.getFamilyMap().get(family)) {
-                kv.updateLatestStamp(Bytes.toBytes(System.currentTimeMillis()));
-                byte[] qualifier = kv.getQualifier();
-                NavigableMap<Long, byte[]> qualifierData = forceFind(familyData, qualifier, new TreeMap<Long, byte[]>());
-                qualifierData.put(kv.getTimestamp(), kv.getValue());
-            }
-        }
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void put(List<Put> puts) throws IOException {
-        for (Put put : puts) {
-            put(put);
-        }
-
-    }
-
-    private boolean check(byte[] row, byte[] family, byte[] qualifier, byte[] value) {
-        if (value == null || value.length == 0)
-            return !data.containsKey(row) || !data.get(row).containsKey(family) || !data.get(row).get(family).containsKey(qualifier);
-        else
-            return data.containsKey(row) && data.get(row).containsKey(family) && data.get(row).get(family).containsKey(qualifier) && !data.get(row).get(family).get(qualifier).isEmpty() && Arrays.equals(data.get(row).get(family).get(qualifier).lastEntry().getValue(), value);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] value, Put put) throws IOException {
-        if (check(row, family, qualifier, value)) {
-            put(put);
-            return true;
-        }
-        return false;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void delete(Delete delete) throws IOException {
-        byte[] row = delete.getRow();
-        if (data.get(row) == null)
-            return;
-        if (delete.getFamilyMap().size() == 0) {
-            data.remove(row);
-            return;
-        }
-        for (byte[] family : delete.getFamilyMap().keySet()) {
-            if (data.get(row).get(family) == null)
-                continue;
-            if (delete.getFamilyMap().get(family).isEmpty()) {
-                data.get(row).remove(family);
-                continue;
-            }
-            for (KeyValue kv : delete.getFamilyMap().get(family)) {
-                if (kv.isDeleteFamily()) {
-                    data.get(row).get(kv.getFamily()).clear();
-                } else {
-                    data.get(row).get(kv.getFamily()).remove(kv.getQualifier());
-                }
-            }
-            if (data.get(row).get(family).isEmpty()) {
-                data.get(row).remove(family);
-            }
-        }
-        if (data.get(row).isEmpty()) {
-            data.remove(row);
-        }
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void delete(List<Delete> deletes) throws IOException {
-        for (Delete delete : deletes) {
-            delete(delete);
-        }
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, byte[] value, Delete delete) throws IOException {
-        if (check(row, family, qualifier, value)) {
-            delete(delete);
-            return true;
-        }
-        return false;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public Result increment(Increment increment) throws IOException {
-        throw new NotImplementedException();
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount) throws IOException {
-        return incrementColumnValue(row, family, qualifier, amount, true);
-    }
-
-    @Override
-    public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, Durability durability) throws IOException {
-        return 0;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, boolean writeToWAL) throws IOException {
-        if (check(row, family, qualifier, null)) {
-            Put put = new Put(row);
-            put.add(family, qualifier, Bytes.toBytes(amount));
-            put(put);
-            return amount;
-        }
-        long newValue = Bytes.toLong(data.get(row).get(family).get(qualifier).lastEntry().getValue()) + amount;
-        data.get(row).get(family).get(qualifier).put(System.currentTimeMillis(), Bytes.toBytes(newValue));
-        return newValue;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public boolean isAutoFlush() {
-        return true;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void flushCommits() throws IOException {
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void close() throws IOException {
-    }
-
-    @Override
-    public CoprocessorRpcChannel coprocessorService(byte[] row) {
-        throw new NotImplementedException();
-
-    }
-
-    @Override
-    public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service, byte[] startKey, byte[] endKey, Batch.Call<T, R> callable) throws ServiceException, Throwable {
-        throw new NotImplementedException();
-
-    }
-
-    @Override
-    public <T extends Service, R> void coprocessorService(Class<T> service, byte[] startKey, byte[] endKey, Batch.Call<T, R> callable, Batch.Callback<R> callback) throws ServiceException, Throwable {
-        throw new NotImplementedException();
-
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void setAutoFlush(boolean autoFlush) {
-        throw new NotImplementedException();
-
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
-        throw new NotImplementedException();
-
-    }
-
-    @Override
-    public void setAutoFlushTo(boolean autoFlush) {
-        throw new NotImplementedException();
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public long getWriteBufferSize() {
-        throw new NotImplementedException();
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void setWriteBufferSize(long writeBufferSize) throws IOException {
-        throw new NotImplementedException();
-
-    }
-
-    @Override
-    public <R extends Message> Map<byte[], R> batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor, Message request, byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
-        throw new NotImplementedException();
-
-    }
-
-    @Override
-    public <R extends Message> void batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor, Message request, byte[] startKey, byte[] endKey, R responsePrototype, Batch.Callback<R> callback) throws ServiceException, Throwable {
-        throw new NotImplementedException();
-
-    }
-
-    @Override
-    public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, RowMutations mutation) throws IOException {
-        throw new NotImplementedException();
-
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3011f6fd/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseJob.java
index 847882b..7d5a354 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseJob.java
@@ -67,7 +67,6 @@ public class MergeCuboidFromHBaseJob extends CuboidJob {
             CubeManager cubeMgr = CubeManager.getInstance(config);
             CubeInstance cube = cubeMgr.getCube(cubeName);
 
-
             Configuration conf = this.getConf();
             HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
 
@@ -81,27 +80,16 @@ public class MergeCuboidFromHBaseJob extends CuboidJob {
             Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
             FileOutputFormat.setOutputPath(job, output);
 
-
             List<Scan> scans = new ArrayList<Scan>();
             for (String htable : StringSplitter.split(getOptionValue(OPTION_INPUT_PATH), ",")) {
                 Scan scan = new Scan();
-                scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
-                scan.setCacheBlocks(false);  // don't set to true for MR jobs
+                scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
+                scan.setCacheBlocks(false); // don't set to true for MR jobs
                 scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(htable));
                 scans.add(scan);
             }
 
-            TableMapReduceUtil.initTableMapperJob(scans, MergeCuboidFromHBaseMapper.class, ImmutableBytesWritable.class,
-                    Text.class, job);
-
-            /*
-            // Reducer - only one
-            job.setReducerClass(CuboidReducer.class);
-            job.setOutputFormatClass(SequenceFileOutputFormat.class);
-            job.setOutputKeyClass(Text.class);
-            job.setOutputValueClass(Text.class);
-
-*/
+            TableMapReduceUtil.initTableMapperJob(scans, MergeCuboidFromHBaseMapper.class, ImmutableBytesWritable.class, Text.class, job);
 
             // set job configuration
             job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
@@ -110,11 +98,8 @@ public class MergeCuboidFromHBaseJob extends CuboidJob {
             // add metadata to distributed cache
             attachKylinPropsAndMetadata(cube, job.getConfiguration());
 
-
             HTable htable = new HTable(conf, htableName);
-            HFileOutputFormat.configureIncrementalLoad(job,
-                    htable);
-
+            HFileOutputFormat.configureIncrementalLoad(job, htable);
 
             // set Reducer; This need be after configureIncrementalLoad, to overwrite the default reducer class
             job.setReducerClass(InMemCuboidReducer.class);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3011f6fd/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 809efd2..8276824 100644
--- a/pom.xml
+++ b/pom.xml
@@ -49,7 +49,7 @@
         <zookeeper.version>3.4.6</zookeeper.version>
         <hive.version>0.14.0</hive.version>
         <hive-hcatalog.version>0.14.0</hive-hcatalog.version>
-        <hbase-hadoop2.version>0.98.8-hadoop2</hbase-hadoop2.version>
+        <hbase-hadoop2.version>0.98.4-hadoop2</hbase-hadoop2.version>
         <kafka.version>0.8.1</kafka.version>
 
         <!-- Dependency versions -->

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3011f6fd/server/src/main/java/org/apache/kylin/rest/security/MockAclHBaseStorage.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/security/MockAclHBaseStorage.java b/server/src/main/java/org/apache/kylin/rest/security/MockAclHBaseStorage.java
index 1309510..f8d9902 100644
--- a/server/src/main/java/org/apache/kylin/rest/security/MockAclHBaseStorage.java
+++ b/server/src/main/java/org/apache/kylin/rest/security/MockAclHBaseStorage.java
@@ -1,7 +1,6 @@
 package org.apache.kylin.rest.security;
 
 import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.kylin.common.util.MockHTable;
 import org.apache.kylin.rest.service.AclService;
 import org.apache.kylin.rest.service.UserService;
 import org.h2.util.StringUtils;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3011f6fd/server/src/main/java/org/apache/kylin/rest/security/MockHTable.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/security/MockHTable.java b/server/src/main/java/org/apache/kylin/rest/security/MockHTable.java
new file mode 100644
index 0000000..eb72f64
--- /dev/null
+++ b/server/src/main/java/org/apache/kylin/rest/security/MockHTable.java
@@ -0,0 +1,690 @@
+/**
+ * This file is licensed to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ */
+
+package org.apache.kylin.rest.security;
+
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import com.google.protobuf.Service;
+import com.google.protobuf.ServiceException;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * MockHTable.
+ *
+ * original MockHTable (by agaoglu) : https://gist.github.com/agaoglu/613217#file_mock_h_table.java
+ *
+ * Modifications
+ *
+ * <ul>
+ *     <li>fix filter (by k-mack) : https://gist.github.com/k-mack/4600133</li>
+ *     <li>fix batch() : implement all mutate operation and fix result[] count.</li>
+ *     <li>fix exists()</li>
+ *     <li>fix increment() : wrong return value</li>
+ *     <li>check columnFamily</li>
+ *     <li>implement mutateRow()</li>
+ *     <li>implement getTableName()</li>
+ *     <li>implement getTableDescriptor()</li>
+ *     <li>throws RuntimeException when unimplemented method was called.</li>
+ *     <li>remove some methods for loading data, checking values ...</li>
+ * </ul>
+ */
+public class MockHTable implements HTableInterface {
+    private final String tableName;
+    private final List<String> columnFamilies = new ArrayList<>();
+
+    private NavigableMap<byte[], NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>> data = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+
+    private static List<KeyValue> toKeyValue(byte[] row, NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowdata, int maxVersions) {
+        return toKeyValue(row, rowdata, 0, Long.MAX_VALUE, maxVersions);
+    }
+
+    public MockHTable(String tableName) {
+        this.tableName = tableName;
+    }
+
+    public MockHTable(String tableName, String... columnFamilies) {
+        this.tableName = tableName;
+        this.columnFamilies.addAll(Arrays.asList(columnFamilies));
+    }
+
+    public void addColumnFamily(String columnFamily) {
+        this.columnFamilies.add(columnFamily);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public byte[] getTableName() {
+        return tableName.getBytes();
+    }
+
+    @Override
+    public TableName getName() {
+        return null;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public Configuration getConfiguration() {
+        throw new RuntimeException(this.getClass() + " does NOT implement this method.");
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public HTableDescriptor getTableDescriptor() throws IOException {
+        HTableDescriptor table = new HTableDescriptor(tableName);
+        for (String columnFamily : columnFamilies) {
+            table.addFamily(new HColumnDescriptor(columnFamily));
+        }
+        return table;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void mutateRow(RowMutations rm) throws IOException {
+        // currently only support Put and Delete
+        for (Mutation mutation : rm.getMutations()) {
+            if (mutation instanceof Put) {
+                put((Put) mutation);
+            } else if (mutation instanceof Delete) {
+                delete((Delete) mutation);
+            }
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public Result append(Append append) throws IOException {
+        throw new RuntimeException(this.getClass() + " does NOT implement this method.");
+    }
+
+    private static List<KeyValue> toKeyValue(byte[] row, NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowdata, long timestampStart, long timestampEnd, int maxVersions) {
+        List<KeyValue> ret = new ArrayList<KeyValue>();
+        for (byte[] family : rowdata.keySet())
+            for (byte[] qualifier : rowdata.get(family).keySet()) {
+                int versionsAdded = 0;
+                for (Map.Entry<Long, byte[]> tsToVal : rowdata.get(family).get(qualifier).descendingMap().entrySet()) {
+                    if (versionsAdded++ == maxVersions)
+                        break;
+                    Long timestamp = tsToVal.getKey();
+                    if (timestamp < timestampStart)
+                        continue;
+                    if (timestamp > timestampEnd)
+                        continue;
+                    byte[] value = tsToVal.getValue();
+                    ret.add(new KeyValue(row, family, qualifier, timestamp, value));
+                }
+            }
+        return ret;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean exists(Get get) throws IOException {
+        Result result = get(get);
+        return result != null && result.isEmpty() == false;
+    }
+
+    @Override
+    public Boolean[] exists(List<Get> gets) throws IOException {
+        return new Boolean[0];
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void batch(List<? extends Row> actions, Object[] results) throws IOException, InterruptedException {
+        results = batch(actions);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public Object[] batch(List<? extends Row> actions) throws IOException, InterruptedException {
+        Object[] results = new Object[actions.size()]; // same size.
+        for (int i = 0; i < actions.size(); i++) {
+            Row r = actions.get(i);
+            if (r instanceof Delete) {
+                delete((Delete) r);
+                results[i] = new Result();
+            }
+            if (r instanceof Put) {
+                put((Put) r);
+                results[i] = new Result();
+            }
+            if (r instanceof Get) {
+                Result result = get((Get) r);
+                results[i] = result;
+            }
+            if (r instanceof Increment) {
+                Result result = increment((Increment) r);
+                results[i] = result;
+            }
+            if (r instanceof Append) {
+                Result result = append((Append) r);
+                results[i] = result;
+            }
+        }
+        return results;
+    }
+
+    @Override
+    public <R> void batchCallback(List<? extends Row> actions, Object[] results, Batch.Callback<R> callback) throws IOException, InterruptedException {
+
+    }
+
+    @Override
+    public <R> Object[] batchCallback(List<? extends Row> actions, Batch.Callback<R> callback) throws IOException, InterruptedException {
+        return new Object[0];
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public Result get(Get get) throws IOException {
+        if (!data.containsKey(get.getRow()))
+            return new Result();
+        byte[] row = get.getRow();
+        List<KeyValue> kvs = new ArrayList<KeyValue>();
+        if (!get.hasFamilies()) {
+            kvs = toKeyValue(row, data.get(row), get.getMaxVersions());
+        } else {
+            for (byte[] family : get.getFamilyMap().keySet()) {
+                if (data.get(row).get(family) == null)
+                    continue;
+                NavigableSet<byte[]> qualifiers = get.getFamilyMap().get(family);
+                if (qualifiers == null || qualifiers.isEmpty())
+                    qualifiers = data.get(row).get(family).navigableKeySet();
+                for (byte[] qualifier : qualifiers) {
+                    if (qualifier == null)
+                        qualifier = "".getBytes();
+                    if (!data.get(row).containsKey(family) || !data.get(row).get(family).containsKey(qualifier) || data.get(row).get(family).get(qualifier).isEmpty())
+                        continue;
+                    Map.Entry<Long, byte[]> timestampAndValue = data.get(row).get(family).get(qualifier).lastEntry();
+                    kvs.add(new KeyValue(row, family, qualifier, timestampAndValue.getKey(), timestampAndValue.getValue()));
+                }
+            }
+        }
+        Filter filter = get.getFilter();
+        if (filter != null) {
+            kvs = filter(filter, kvs);
+        }
+
+        return new Result(kvs);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public Result[] get(List<Get> gets) throws IOException {
+        List<Result> results = new ArrayList<Result>();
+        for (Get g : gets) {
+            results.add(get(g));
+        }
+        return results.toArray(new Result[results.size()]);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public Result getRowOrBefore(byte[] row, byte[] family) throws IOException {
+        // FIXME: implement
+        return null;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public ResultScanner getScanner(Scan scan) throws IOException {
+        final List<Result> ret = new ArrayList<Result>();
+        byte[] st = scan.getStartRow();
+        byte[] sp = scan.getStopRow();
+        Filter filter = scan.getFilter();
+
+        for (byte[] row : data.keySet()) {
+            // if row is equal to startRow emit it. When startRow (inclusive) and
+            // stopRow (exclusive) is the same, it should not be excluded which would
+            // happen w/o this control.
+            if (st != null && st.length > 0 && Bytes.BYTES_COMPARATOR.compare(st, row) != 0) {
+                // if row is before startRow do not emit, pass to next row
+                if (st != null && st.length > 0 && Bytes.BYTES_COMPARATOR.compare(st, row) > 0)
+                    continue;
+                // if row is equal to stopRow or after it do not emit, stop iteration
+                if (sp != null && sp.length > 0 && Bytes.BYTES_COMPARATOR.compare(sp, row) <= 0)
+                    break;
+            }
+
+            List<KeyValue> kvs = null;
+            if (!scan.hasFamilies()) {
+                kvs = toKeyValue(row, data.get(row), scan.getTimeRange().getMin(), scan.getTimeRange().getMax(), scan.getMaxVersions());
+            } else {
+                kvs = new ArrayList<KeyValue>();
+                for (byte[] family : scan.getFamilyMap().keySet()) {
+                    if (data.get(row).get(family) == null)
+                        continue;
+                    NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(family);
+                    if (qualifiers == null || qualifiers.isEmpty())
+                        qualifiers = data.get(row).get(family).navigableKeySet();
+                    for (byte[] qualifier : qualifiers) {
+                        if (data.get(row).get(family).get(qualifier) == null)
+                            continue;
+                        for (Long timestamp : data.get(row).get(family).get(qualifier).descendingKeySet()) {
+                            if (timestamp < scan.getTimeRange().getMin())
+                                continue;
+                            if (timestamp > scan.getTimeRange().getMax())
+                                continue;
+                            byte[] value = data.get(row).get(family).get(qualifier).get(timestamp);
+                            kvs.add(new KeyValue(row, family, qualifier, timestamp, value));
+                            if (kvs.size() == scan.getMaxVersions()) {
+                                break;
+                            }
+                        }
+                    }
+                }
+            }
+            if (filter != null) {
+                kvs = filter(filter, kvs);
+                // Check for early out optimization
+                if (filter.filterAllRemaining()) {
+                    break;
+                }
+            }
+            if (!kvs.isEmpty()) {
+                ret.add(new Result(kvs));
+            }
+        }
+
+        return new ResultScanner() {
+            private final Iterator<Result> iterator = ret.iterator();
+
+            public Iterator<Result> iterator() {
+                return iterator;
+            }
+
+            public Result[] next(int nbRows) throws IOException {
+                ArrayList<Result> resultSets = new ArrayList<Result>(nbRows);
+                for (int i = 0; i < nbRows; i++) {
+                    Result next = next();
+                    if (next != null) {
+                        resultSets.add(next);
+                    } else {
+                        break;
+                    }
+                }
+                return resultSets.toArray(new Result[resultSets.size()]);
+            }
+
+            public Result next() throws IOException {
+                try {
+                    return iterator().next();
+                } catch (NoSuchElementException e) {
+                    return null;
+                }
+            }
+
+            public void close() {
+            }
+        };
+    }
+
+    /**
+     * Follows the logical flow through the filter methods for a single row.
+     *
+     * @param filter HBase filter.
+     * @param kvs    List of a row's KeyValues
+     * @return List of KeyValues that were not filtered.
+     */
+    private List<KeyValue> filter(Filter filter, List<KeyValue> kvs) throws IOException {
+        filter.reset();
+
+        List<KeyValue> tmp = new ArrayList<KeyValue>(kvs.size());
+        tmp.addAll(kvs);
+
+        /*
+         * Note. Filter flow for a single row. Adapted from
+         * "HBase: The Definitive Guide" (p. 163) by Lars George, 2011.
+         * See Figure 4-2 on p. 163.
+         */
+        boolean filteredOnRowKey = false;
+        List<KeyValue> nkvs = new ArrayList<KeyValue>(tmp.size());
+        for (KeyValue kv : tmp) {
+            if (filter.filterRowKey(kv.getBuffer(), kv.getRowOffset(), kv.getRowLength())) {
+                filteredOnRowKey = true;
+                break;
+            }
+            Filter.ReturnCode filterResult = filter.filterKeyValue(kv);
+            if (filterResult == Filter.ReturnCode.INCLUDE) {
+                nkvs.add(kv);
+            } else if (filterResult == Filter.ReturnCode.NEXT_ROW) {
+                break;
+            } else if (filterResult == Filter.ReturnCode.NEXT_COL || filterResult == Filter.ReturnCode.SKIP) {
+                continue;
+            }
+            /*
+             * Ignoring next key hint which is a optimization to reduce file
+             * system IO
+             */
+        }
+        if (filter.hasFilterRow() && !filteredOnRowKey) {
+            filter.filterRow(nkvs);
+        }
+        if (filter.filterRow() || filteredOnRowKey) {
+            nkvs.clear();
+        }
+        tmp = nkvs;
+        return tmp;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public ResultScanner getScanner(byte[] family) throws IOException {
+        Scan scan = new Scan();
+        scan.addFamily(family);
+        return getScanner(scan);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException {
+        Scan scan = new Scan();
+        scan.addColumn(family, qualifier);
+        return getScanner(scan);
+    }
+
+    private <K, V> V forceFind(NavigableMap<K, V> map, K key, V newObject) {
+        V data = map.get(key);
+        if (data == null) {
+            data = newObject;
+            map.put(key, data);
+        }
+        return data;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void put(Put put) throws IOException {
+        byte[] row = put.getRow();
+        NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowData = forceFind(data, row, new TreeMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>(Bytes.BYTES_COMPARATOR));
+        for (byte[] family : put.getFamilyMap().keySet()) {
+            if (columnFamilies.contains(new String(family)) == false) {
+                throw new RuntimeException("Not Exists columnFamily : " + new String(family));
+            }
+            NavigableMap<byte[], NavigableMap<Long, byte[]>> familyData = forceFind(rowData, family, new TreeMap<byte[], NavigableMap<Long, byte[]>>(Bytes.BYTES_COMPARATOR));
+            for (KeyValue kv : put.getFamilyMap().get(family)) {
+                kv.updateLatestStamp(Bytes.toBytes(System.currentTimeMillis()));
+                byte[] qualifier = kv.getQualifier();
+                NavigableMap<Long, byte[]> qualifierData = forceFind(familyData, qualifier, new TreeMap<Long, byte[]>());
+                qualifierData.put(kv.getTimestamp(), kv.getValue());
+            }
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void put(List<Put> puts) throws IOException {
+        for (Put put : puts) {
+            put(put);
+        }
+
+    }
+
+    private boolean check(byte[] row, byte[] family, byte[] qualifier, byte[] value) {
+        if (value == null || value.length == 0)
+            return !data.containsKey(row) || !data.get(row).containsKey(family) || !data.get(row).get(family).containsKey(qualifier);
+        else
+            return data.containsKey(row) && data.get(row).containsKey(family) && data.get(row).get(family).containsKey(qualifier) && !data.get(row).get(family).get(qualifier).isEmpty() && Arrays.equals(data.get(row).get(family).get(qualifier).lastEntry().getValue(), value);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] value, Put put) throws IOException {
+        if (check(row, family, qualifier, value)) {
+            put(put);
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void delete(Delete delete) throws IOException {
+        byte[] row = delete.getRow();
+        if (data.get(row) == null)
+            return;
+        if (delete.getFamilyMap().size() == 0) {
+            data.remove(row);
+            return;
+        }
+        for (byte[] family : delete.getFamilyMap().keySet()) {
+            if (data.get(row).get(family) == null)
+                continue;
+            if (delete.getFamilyMap().get(family).isEmpty()) {
+                data.get(row).remove(family);
+                continue;
+            }
+            for (KeyValue kv : delete.getFamilyMap().get(family)) {
+                if (kv.isDeleteFamily()) {
+                    data.get(row).get(kv.getFamily()).clear();
+                } else {
+                    data.get(row).get(kv.getFamily()).remove(kv.getQualifier());
+                }
+            }
+            if (data.get(row).get(family).isEmpty()) {
+                data.get(row).remove(family);
+            }
+        }
+        if (data.get(row).isEmpty()) {
+            data.remove(row);
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void delete(List<Delete> deletes) throws IOException {
+        for (Delete delete : deletes) {
+            delete(delete);
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, byte[] value, Delete delete) throws IOException {
+        if (check(row, family, qualifier, value)) {
+            delete(delete);
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public Result increment(Increment increment) throws IOException {
+        throw new NotImplementedException();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount) throws IOException {
+        return incrementColumnValue(row, family, qualifier, amount, true);
+    }
+
+    @Override
+    public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, Durability durability) throws IOException {
+        return 0;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, boolean writeToWAL) throws IOException {
+        if (check(row, family, qualifier, null)) {
+            Put put = new Put(row);
+            put.add(family, qualifier, Bytes.toBytes(amount));
+            put(put);
+            return amount;
+        }
+        long newValue = Bytes.toLong(data.get(row).get(family).get(qualifier).lastEntry().getValue()) + amount;
+        data.get(row).get(family).get(qualifier).put(System.currentTimeMillis(), Bytes.toBytes(newValue));
+        return newValue;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean isAutoFlush() {
+        return true;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void flushCommits() throws IOException {
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public CoprocessorRpcChannel coprocessorService(byte[] row) {
+        throw new NotImplementedException();
+
+    }
+
+    @Override
+    public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service, byte[] startKey, byte[] endKey, Batch.Call<T, R> callable) throws ServiceException, Throwable {
+        throw new NotImplementedException();
+
+    }
+
+    @Override
+    public <T extends Service, R> void coprocessorService(Class<T> service, byte[] startKey, byte[] endKey, Batch.Call<T, R> callable, Batch.Callback<R> callback) throws ServiceException, Throwable {
+        throw new NotImplementedException();
+
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void setAutoFlush(boolean autoFlush) {
+        throw new NotImplementedException();
+
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
+        throw new NotImplementedException();
+
+    }
+
+    @Override
+    public void setAutoFlushTo(boolean autoFlush) {
+        throw new NotImplementedException();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public long getWriteBufferSize() {
+        throw new NotImplementedException();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void setWriteBufferSize(long writeBufferSize) throws IOException {
+        throw new NotImplementedException();
+
+    }
+
+    @Override
+    public <R extends Message> Map<byte[], R> batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor, Message request, byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
+        throw new NotImplementedException();
+
+    }
+
+    @Override
+    public <R extends Message> void batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor, Message request, byte[] startKey, byte[] endKey, R responsePrototype, Batch.Callback<R> callback) throws ServiceException, Throwable {
+        throw new NotImplementedException();
+
+    }
+
+    //@Override  (only since 0.98.8)
+    public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, RowMutations mutation) throws IOException {
+        throw new NotImplementedException();
+
+    }
+}
\ No newline at end of file