You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/24 01:20:12 UTC

[04/28] incubator-kylin git commit: KYLIN-875 Split job module into 'core-job', 'engine-mr', 'source-hive', 'storage-hbase'. The old job remains as an assembly project.

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java
new file mode 100644
index 0000000..ed67109
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java
@@ -0,0 +1,85 @@
+package org.apache.kylin.storage.hbase.steps;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
+import org.apache.kylin.metadata.realization.IRealizationConstants;
+import org.apache.kylin.storage.hbase.util.DeployCoprocessorCLI;
+import org.apache.kylin.storage.hbase.util.LZOSupportnessChecker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ */
+public class CubeHTableUtil {
+
+    private static final Logger logger = LoggerFactory.getLogger(CubeHTableUtil.class);
+
+    public static void createHTable(CubeDesc cubeDesc, String tableName, byte[][] splitKeys) throws IOException {
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+
+        HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
+        // https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/regionserver/ConstantSizeRegionSplitPolicy.html
+        tableDesc.setValue(HTableDescriptor.SPLIT_POLICY, ConstantSizeRegionSplitPolicy.class.getName());
+        tableDesc.setValue(IRealizationConstants.HTableTag, kylinConfig.getMetadataUrlPrefix());
+
+        Configuration conf = HBaseConfiguration.create();
+        HBaseAdmin admin = new HBaseAdmin(conf);
+
+        try {
+            if (User.isHBaseSecurityEnabled(conf)) {
+                // add coprocessor for bulk load
+                tableDesc.addCoprocessor("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
+            }
+
+            for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHBaseMapping().getColumnFamily()) {
+                HColumnDescriptor cf = new HColumnDescriptor(cfDesc.getName());
+                cf.setMaxVersions(1);
+
+                if (LZOSupportnessChecker.getSupportness()) {
+                    logger.info("hbase will use lzo to compress cube data");
+                    cf.setCompressionType(Compression.Algorithm.LZO);
+                } else {
+                    logger.info("hbase will not use lzo to compress cube data");
+                }
+
+                cf.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF);
+                cf.setInMemory(false);
+                cf.setBlocksize(4 * 1024 * 1024); // set to 4MB
+                tableDesc.addFamily(cf);
+            }
+
+            if (admin.tableExists(tableName)) {
+                // admin.disableTable(tableName);
+                // admin.deleteTable(tableName);
+                throw new RuntimeException("HBase table " + tableName + " exists!");
+            }
+
+            DeployCoprocessorCLI.deployCoprocessor(tableDesc);
+
+            admin.createTable(tableDesc, splitKeys);
+            Preconditions.checkArgument(admin.isTableAvailable(tableName), "table " + tableName + " created, but is not available due to some reasons");
+            logger.info("create hbase table " + tableName + " done.");
+        } catch (Exception e) {
+            logger.error("Failed to create HTable", e);
+            throw e;
+        } finally {
+            admin.close();
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
new file mode 100644
index 0000000..e42f709
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
@@ -0,0 +1,135 @@
+package org.apache.kylin.storage.hbase.steps;
+
+import java.util.List;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
+import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.engine.mr.steps.RangeKeyDistributionJob;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+public class HBaseMRSteps extends JobBuilderSupport {
+    
+    public HBaseMRSteps(CubeSegment seg) {
+        super(seg, null);
+    }
+
+    public void addSaveCuboidToHTableSteps(DefaultChainedExecutable jobFlow, String cuboidRootPath) {
+        String jobId = jobFlow.getId();
+        
+        // calculate key distribution
+        jobFlow.addTask(createRangeRowkeyDistributionStep(cuboidRootPath + "*", jobId));
+        // create htable step
+        jobFlow.addTask(createCreateHTableStep(jobId));
+        // generate hfiles step
+        jobFlow.addTask(createConvertCuboidToHfileStep(cuboidRootPath + "*", jobId));
+        // bulk load step
+        jobFlow.addTask(createBulkLoadStep(jobId));
+    }
+
+    public MapReduceExecutable createRangeRowkeyDistributionStep(String inputPath, String jobId) {
+        MapReduceExecutable rowkeyDistributionStep = new MapReduceExecutable();
+        rowkeyDistributionStep.setName(ExecutableConstants.STEP_NAME_GET_CUBOID_KEY_DISTRIBUTION);
+        StringBuilder cmd = new StringBuilder();
+
+        appendMapReduceParameters(cmd, seg);
+        appendExecCmdParameters(cmd, "input", inputPath);
+        appendExecCmdParameters(cmd, "output", getRowkeyDistributionOutputPath(jobId));
+        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+        appendExecCmdParameters(cmd, "jobname", "Kylin_Region_Splits_Calculator_" + seg.getCubeInstance().getName() + "_Step");
+
+        rowkeyDistributionStep.setMapReduceParams(cmd.toString());
+        rowkeyDistributionStep.setMapReduceJobClass(RangeKeyDistributionJob.class);
+        return rowkeyDistributionStep;
+    }
+
+    public HadoopShellExecutable createCreateHTableStep(String jobId) {
+        return createCreateHTableStep(jobId, false);
+    }
+    
+    public HadoopShellExecutable createCreateHTableStepWithStats(String jobId) {
+        return createCreateHTableStep(jobId, true);
+    }
+    
+    private HadoopShellExecutable createCreateHTableStep(String jobId, boolean withStats) {
+        HadoopShellExecutable createHtableStep = new HadoopShellExecutable();
+        createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE);
+        StringBuilder cmd = new StringBuilder();
+        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+        appendExecCmdParameters(cmd, "segmentname", seg.getName());
+        appendExecCmdParameters(cmd, "input", getRowkeyDistributionOutputPath(jobId) + "/part-r-00000");
+        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
+        appendExecCmdParameters(cmd, "statisticsenabled", String.valueOf(withStats));
+
+        createHtableStep.setJobParams(cmd.toString());
+        createHtableStep.setJobClass(CreateHTableJob.class);
+
+        return createHtableStep;
+    }
+
+    public MapReduceExecutable createConvertCuboidToHfileStep(String inputPath, String jobId) {
+        MapReduceExecutable createHFilesStep = new MapReduceExecutable();
+        createHFilesStep.setName(ExecutableConstants.STEP_NAME_CONVERT_CUBOID_TO_HFILE);
+        StringBuilder cmd = new StringBuilder();
+
+        appendMapReduceParameters(cmd, seg);
+        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+        appendExecCmdParameters(cmd, "input", inputPath);
+        appendExecCmdParameters(cmd, "output", getHFilePath(jobId));
+        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
+        appendExecCmdParameters(cmd, "jobname", "Kylin_HFile_Generator_" + seg.getCubeInstance().getName() + "_Step");
+
+        createHFilesStep.setMapReduceParams(cmd.toString());
+        createHFilesStep.setMapReduceJobClass(CubeHFileJob.class);
+        createHFilesStep.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES);
+
+        return createHFilesStep;
+    }
+
+    public HadoopShellExecutable createBulkLoadStep(String jobId) {
+        HadoopShellExecutable bulkLoadStep = new HadoopShellExecutable();
+        bulkLoadStep.setName(ExecutableConstants.STEP_NAME_BULK_LOAD_HFILE);
+
+        StringBuilder cmd = new StringBuilder();
+        appendExecCmdParameters(cmd, "input", getHFilePath(jobId));
+        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
+        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+
+        bulkLoadStep.setJobParams(cmd.toString());
+        bulkLoadStep.setJobClass(BulkLoadJob.class);
+
+        return bulkLoadStep;
+    }
+    
+    public MergeGCStep createMergeGCStep() {
+        MergeGCStep result = new MergeGCStep();
+        result.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
+        result.setOldHTables(getMergingHTables());
+        return result;
+    }
+
+    public List<String> getMergingHTables() {
+        final List<CubeSegment> mergingSegments = seg.getCubeInstance().getMergingSegments(seg);
+        Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
+        final List<String> mergingHTables = Lists.newArrayList();
+        for (CubeSegment merging : mergingSegments) {
+            mergingHTables.add(merging.getStorageLocationIdentifier());
+        }
+        return mergingHTables;
+    }
+    
+    public String getHFilePath(String jobId) {
+        return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/hfile/";
+    }
+
+    public String getRowkeyDistributionOutputPath(String jobId) {
+        return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/rowkey_stats";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/InMemKeyValueCreator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/InMemKeyValueCreator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/InMemKeyValueCreator.java
new file mode 100644
index 0000000..b2d691e
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/InMemKeyValueCreator.java
@@ -0,0 +1,73 @@
+package org.apache.kylin.storage.hbase.steps;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.model.HBaseColumnDesc;
+import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.metadata.model.MeasureDesc;
+
+import com.google.common.collect.Lists;
+
+public class InMemKeyValueCreator {
+    byte[] cfBytes;
+    byte[] qBytes;
+    long timestamp;
+
+
+    MeasureCodec codec;
+    Object[] colValues;
+    ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+
+    int startPosition = 0;
+
+    public InMemKeyValueCreator(HBaseColumnDesc colDesc, int startPosition) {
+
+        cfBytes = Bytes.toBytes(colDesc.getColumnFamilyName());
+        qBytes = Bytes.toBytes(colDesc.getQualifier());
+        timestamp = System.currentTimeMillis();
+
+        List<MeasureDesc> measures = Lists.newArrayList();
+        for (MeasureDesc measure : colDesc.getMeasures()) {
+            measures.add(measure);
+        }
+        codec = new MeasureCodec(measures);
+        colValues = new Object[measures.size()];
+
+        this.startPosition = startPosition;
+
+    }
+
+    public KeyValue create(Text key, Object[] measureValues) {
+        return create(key.getBytes(), 0, key.getLength(), measureValues);
+    }
+
+    public KeyValue create(byte[] keyBytes, int keyOffset, int keyLength, Object[] measureValues) {
+        for (int i = 0; i < colValues.length; i++) {
+            colValues[i] = measureValues[startPosition + i];
+        }
+
+        valueBuf.clear();
+        codec.encode(colValues, valueBuf);
+
+        return create(keyBytes, keyOffset, keyLength, valueBuf.array(), 0, valueBuf.position());
+    }
+
+
+    public KeyValue create(byte[] keyBytes, int keyOffset, int keyLength, byte[] value, int voffset, int vlen) {
+        return new KeyValue(keyBytes, keyOffset, keyLength, //
+                cfBytes, 0, cfBytes.length, //
+                qBytes, 0, qBytes.length, //
+                timestamp, KeyValue.Type.Put, //
+                value, voffset, vlen);
+    }
+
+    public KeyValue create(Text key, byte[] value, int voffset, int vlen) {
+        return create(key.getBytes(), 0, key.getLength(), value, voffset, vlen);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java
new file mode 100644
index 0000000..3e49e27
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/KeyValueCreator.java
@@ -0,0 +1,104 @@
+package org.apache.kylin.storage.hbase.steps;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.HBaseColumnDesc;
+import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.metadata.model.MeasureDesc;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * @author George Song (ysong1)
+ */
+public class KeyValueCreator {
+    byte[] cfBytes;
+    byte[] qBytes;
+    long timestamp;
+
+    int[] refIndex;
+    MeasureDesc[] refMeasures;
+
+    MeasureCodec codec;
+    Object[] colValues;
+    ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+
+    public boolean isFullCopy;
+
+    public KeyValueCreator(CubeDesc cubeDesc, HBaseColumnDesc colDesc) {
+
+        cfBytes = Bytes.toBytes(colDesc.getColumnFamilyName());
+        qBytes = Bytes.toBytes(colDesc.getQualifier());
+        timestamp = System.currentTimeMillis();
+
+        List<MeasureDesc> measures = cubeDesc.getMeasures();
+        String[] measureNames = getMeasureNames(cubeDesc);
+        String[] refs = colDesc.getMeasureRefs();
+
+        refIndex = new int[refs.length];
+        refMeasures = new MeasureDesc[refs.length];
+        for (int i = 0; i < refs.length; i++) {
+            refIndex[i] = indexOf(measureNames, refs[i]);
+            refMeasures[i] = measures.get(refIndex[i]);
+        }
+
+        codec = new MeasureCodec(refMeasures);
+        colValues = new Object[refs.length];
+
+        isFullCopy = true;
+        for (int i = 0; i < measures.size(); i++) {
+            if (refIndex.length <= i || refIndex[i] != i)
+                isFullCopy = false;
+        }
+    }
+
+    public KeyValue create(Text key, Object[] measureValues) {
+        return create(key.getBytes(), 0, key.getLength(), measureValues);
+    }
+
+    public KeyValue create(byte[] keyBytes, int keyOffset, int keyLength, Object[] measureValues) {
+        for (int i = 0; i < colValues.length; i++) {
+            colValues[i] = measureValues[refIndex[i]];
+        }
+
+        valueBuf.clear();
+        codec.encode(colValues, valueBuf);
+
+        return create(keyBytes, keyOffset, keyLength, valueBuf.array(), 0, valueBuf.position());
+    }
+
+
+    public KeyValue create(byte[] keyBytes, int keyOffset, int keyLength, byte[] value, int voffset, int vlen) {
+        return new KeyValue(keyBytes, keyOffset, keyLength, //
+                cfBytes, 0, cfBytes.length, //
+                qBytes, 0, qBytes.length, //
+                timestamp, KeyValue.Type.Put, //
+                value, voffset, vlen);
+    }
+
+    public KeyValue create(Text key, byte[] value, int voffset, int vlen) {
+        return create(key.getBytes(), 0, key.getLength(), value, voffset, vlen);
+    }
+
+    private int indexOf(String[] measureNames, String ref) {
+        for (int i = 0; i < measureNames.length; i++)
+            if (measureNames[i].equalsIgnoreCase(ref))
+                return i;
+
+        throw new IllegalArgumentException("Measure '" + ref + "' not found in " + Arrays.toString(measureNames));
+    }
+
+    private String[] getMeasureNames(CubeDesc cubeDesc) {
+        List<MeasureDesc> measures = cubeDesc.getMeasures();
+        String[] result = new String[measures.size()];
+        for (int i = 0; i < measures.size(); i++)
+            result[i] = measures.get(i).getName();
+        return result;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java
new file mode 100644
index 0000000..df42560
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.steps;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.realization.IRealizationConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Drop HBase tables that is no longer needed
+ */
+public class MergeGCStep extends AbstractExecutable {
+
+    private static final String OLD_HTABLES = "oldHTables";
+
+    private static final Logger logger = LoggerFactory.getLogger(MergeGCStep.class);
+
+    public MergeGCStep() {
+        super();
+    }
+
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+
+        StringBuffer output = new StringBuffer();
+
+        List<String> oldTables = getOldHTables();
+        if (oldTables != null && oldTables.size() > 0) {
+            String metadataUrlPrefix = KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix();
+            Configuration conf = HBaseConfiguration.create();
+            HBaseAdmin admin = null;
+            try {
+                admin = new HBaseAdmin(conf);
+                for (String table : oldTables) {
+                    if (admin.tableExists(table)) {
+                        HTableDescriptor tableDescriptor = admin.getTableDescriptor(Bytes.toBytes(table));
+                        String host = tableDescriptor.getValue(IRealizationConstants.HTableTag);
+                        if (metadataUrlPrefix.equalsIgnoreCase(host)) {
+                            if (admin.isTableEnabled(table)) {
+                                admin.disableTable(table);
+                            }
+                            admin.deleteTable(table);
+                            logger.debug("Dropped htable: " + table);
+                            output.append("HBase table " + table + " is dropped. \n");
+                        } else {
+                            logger.debug("Skip htable: " + table);
+                            output.append("Skip htable: " + table + ". \n");
+                        }
+                    }
+                }
+
+            } catch (IOException e) {
+                output.append("Got error when drop HBase table, exiting... \n");
+                // This should not block the merge job; Orphans should be cleaned up in StorageCleanupJob
+                return new ExecuteResult(ExecuteResult.State.ERROR, output.append(e.getLocalizedMessage()).toString());
+            } finally {
+                if (admin != null)
+                    try {
+                        admin.close();
+                    } catch (IOException e) {
+                        logger.error(e.getLocalizedMessage());
+                    }
+            }
+        }
+
+        return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
+    }
+
+    public void setOldHTables(List<String> ids) {
+        setParam(OLD_HTABLES, StringUtils.join(ids, ","));
+    }
+
+    private List<String> getOldHTables() {
+        final String ids = getParam(OLD_HTABLES);
+        if (ids != null) {
+            final String[] splitted = StringUtils.split(ids, ",");
+            ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length);
+            for (String id : splitted) {
+                result.add(id);
+            }
+            return result;
+        } else {
+            return Collections.emptyList();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java
new file mode 100644
index 0000000..877b71b
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.steps;
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.cube.model.HBaseColumnDesc;
+import org.apache.kylin.metadata.measure.DoubleMutable;
+import org.apache.kylin.metadata.measure.LongMutable;
+import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.Collection;
+
+/**
+ */
+public class RowValueDecoder implements Cloneable {
+
+    private final HBaseColumnDesc hbaseColumn;
+    private final byte[] hbaseColumnFamily;
+    private final byte[] hbaseColumnQualifier;
+
+    private final MeasureCodec codec;
+    private final BitSet projectionIndex;
+    private final MeasureDesc[] measures;
+    private Object[] values;
+
+    public RowValueDecoder(HBaseColumnDesc hbaseColumn) {
+        this.hbaseColumn = hbaseColumn;
+        this.hbaseColumnFamily = Bytes.toBytes(hbaseColumn.getColumnFamilyName());
+        this.hbaseColumnQualifier = Bytes.toBytes(hbaseColumn.getQualifier());
+        this.projectionIndex = new BitSet();
+        this.measures = hbaseColumn.getMeasures();
+        this.codec = new MeasureCodec(measures);
+        this.values = new Object[measures.length];
+    }
+
+    public void decode(Result hbaseRow) {
+        decode(hbaseRow, true);
+    }
+
+    public void decode(Result hbaseRow, boolean convertToJavaObject) {
+        decode(hbaseRow.getValueAsByteBuffer(hbaseColumnFamily, hbaseColumnQualifier), convertToJavaObject);
+    }
+
+    public void decode(byte[] bytes) {
+        decode(bytes, true);
+    }
+
+    public void decode(byte[] bytes, boolean convertToJavaObject) {
+        decode(ByteBuffer.wrap(bytes), convertToJavaObject);
+    }
+
+    private void decode(ByteBuffer buffer, boolean convertToJavaObject) {
+        codec.decode(buffer, values);
+        if (convertToJavaObject) {
+            convertToJavaObjects(values, values, convertToJavaObject);
+        }
+    }
+
+    private void convertToJavaObjects(Object[] mapredObjs, Object[] results, boolean convertToJavaObject) {
+        for (int i = 0; i < mapredObjs.length; i++) {
+            Object o = mapredObjs[i];
+
+            if (o instanceof LongMutable)
+                o = ((LongMutable) o).get();
+            else if (o instanceof DoubleMutable)
+                o = ((DoubleMutable) o).get();
+
+            results[i] = o;
+        }
+    }
+
+    public void setIndex(int bitIndex) {
+        projectionIndex.set(bitIndex);
+    }
+
+    public HBaseColumnDesc getHBaseColumn() {
+        return hbaseColumn;
+    }
+
+    public BitSet getProjectionIndex() {
+        return projectionIndex;
+    }
+
+    public Object[] getValues() {
+        return values;
+    }
+
+    public MeasureDesc[] getMeasures() {
+        return measures;
+    }
+
+    public boolean hasMemHungryCountDistinct() {
+        for (int i = projectionIndex.nextSetBit(0); i >= 0; i = projectionIndex.nextSetBit(i + 1)) {
+            FunctionDesc func = measures[i].getFunction();
+            if (func.isCountDistinct() && !func.isHolisticCountDistinct()) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public static boolean hasMemHungryCountDistinct(Collection<RowValueDecoder> rowValueDecoders) {
+        for (RowValueDecoder decoder : rowValueDecoders) {
+            if (decoder.hasMemHungryCountDistinct())
+                return true;
+        }
+        return false;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
new file mode 100644
index 0000000..5b3949f
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.util;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.realization.RealizationStatusEnum;
+import org.apache.kylin.storage.hbase.HBaseConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.*;
+import java.util.regex.Matcher;
+
+/**
+ * @author yangli9
+ */
+public class DeployCoprocessorCLI {
+
+    private static final Logger logger = LoggerFactory.getLogger(DeployCoprocessorCLI.class);
+
+    public static final String OBSERVER_CLS_NAME = "org.apache.kylin.storage.hbase.coprocessor.observer.AggregateRegionObserver";
+    public static final String ENDPOINT_CLS_NAMAE = "org.apache.kylin.storage.hbase.coprocessor.endpoint.IIEndpoint";
+
+    public static void main(String[] args) throws IOException {
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        Configuration hconf = HBaseConnection.newHBaseConfiguration(kylinConfig.getStorageUrl());
+        FileSystem fileSystem = FileSystem.get(hconf);
+        HBaseAdmin hbaseAdmin = new HBaseAdmin(hconf);
+
+        String localCoprocessorJar = new File(args[0]).getAbsolutePath();
+        logger.info("Identify coprocessor jar " + localCoprocessorJar);
+
+        List<String> tableNames = getHTableNames(kylinConfig);
+        logger.info("Identify tables " + tableNames);
+
+        Set<String> oldJarPaths = getCoprocessorJarPaths(hbaseAdmin, tableNames);
+        logger.info("Old coprocessor jar: " + oldJarPaths);
+
+        Path hdfsCoprocessorJar = uploadCoprocessorJar(localCoprocessorJar, fileSystem, oldJarPaths);
+        logger.info("New coprocessor jar: " + hdfsCoprocessorJar);
+
+        List<String> processedTables = resetCoprocessorOnHTables(hbaseAdmin, hdfsCoprocessorJar, tableNames);
+
+        // Don't remove old jars, missing coprocessor jar will fail hbase
+        // removeOldJars(oldJarPaths, fileSystem);
+
+        hbaseAdmin.close();
+
+        logger.info("Processed " + processedTables);
+        logger.info("Active coprocessor jar: " + hdfsCoprocessorJar);
+    }
+
+    public static void deployCoprocessor(HTableDescriptor tableDesc) {
+        try {
+            initHTableCoprocessor(tableDesc);
+            logger.info("hbase table " + tableDesc.getName() + " deployed with coprocessor.");
+
+        } catch (Exception ex) {
+            logger.error("Error deploying coprocessor on " + tableDesc.getName(), ex);
+            logger.error("Will try creating the table without coprocessor.");
+        }
+    }
+
+    private static void initHTableCoprocessor(HTableDescriptor desc) throws IOException {
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        Configuration hconf = HadoopUtil.getCurrentConfiguration();
+        FileSystem fileSystem = FileSystem.get(hconf);
+
+        String localCoprocessorJar = kylinConfig.getCoprocessorLocalJar();
+        Path hdfsCoprocessorJar = DeployCoprocessorCLI.uploadCoprocessorJar(localCoprocessorJar, fileSystem, null);
+
+        DeployCoprocessorCLI.addCoprocessorOnHTable(desc, hdfsCoprocessorJar);
+    }
+
+    public static void addCoprocessorOnHTable(HTableDescriptor desc, Path hdfsCoprocessorJar) throws IOException {
+        logger.info("Add coprocessor on " + desc.getNameAsString());
+        desc.addCoprocessor(ENDPOINT_CLS_NAMAE, hdfsCoprocessorJar, 1000, null);
+        desc.addCoprocessor(OBSERVER_CLS_NAME, hdfsCoprocessorJar, 1001, null);
+    }
+
+    public static void resetCoprocessor(String tableName, HBaseAdmin hbaseAdmin, Path hdfsCoprocessorJar) throws IOException {
+        logger.info("Disable " + tableName);
+        hbaseAdmin.disableTable(tableName);
+
+        logger.info("Unset coprocessor on " + tableName);
+        HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
+        while (desc.hasCoprocessor(OBSERVER_CLS_NAME)) {
+            desc.removeCoprocessor(OBSERVER_CLS_NAME);
+        }
+        while (desc.hasCoprocessor(ENDPOINT_CLS_NAMAE)) {
+            desc.removeCoprocessor(ENDPOINT_CLS_NAMAE);
+        }
+
+        addCoprocessorOnHTable(desc, hdfsCoprocessorJar);
+        hbaseAdmin.modifyTable(tableName, desc);
+
+        logger.info("Enable " + tableName);
+        hbaseAdmin.enableTable(tableName);
+    }
+
+    private static List<String> resetCoprocessorOnHTables(HBaseAdmin hbaseAdmin, Path hdfsCoprocessorJar, List<String> tableNames) throws IOException {
+        List<String> processed = new ArrayList<String>();
+
+        for (String tableName : tableNames) {
+            try {
+                resetCoprocessor(tableName, hbaseAdmin, hdfsCoprocessorJar);
+                processed.add(tableName);
+            } catch (IOException ex) {
+                logger.error("Error processing " + tableName, ex);
+            }
+        }
+        return processed;
+    }
+
+    public static Path getNewestCoprocessorJar(KylinConfig config, FileSystem fileSystem) throws IOException {
+        Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, config);
+        FileStatus newestJar = null;
+        for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) {
+            if (fileStatus.getPath().toString().endsWith(".jar")) {
+                if (newestJar == null) {
+                    newestJar = fileStatus;
+                } else {
+                    if (newestJar.getModificationTime() < fileStatus.getModificationTime())
+                        newestJar = fileStatus;
+                }
+            }
+        }
+        if (newestJar == null)
+            return null;
+
+        Path path = newestJar.getPath().makeQualified(fileSystem.getUri(), null);
+        logger.info("The newest coprocessor is " + path.toString());
+        return path;
+    }
+
+    public static Path uploadCoprocessorJar(String localCoprocessorJar, FileSystem fileSystem, Set<String> oldJarPaths) throws IOException {
+        Path uploadPath = null;
+        File localCoprocessorFile = new File(localCoprocessorJar);
+
+        // check existing jars
+        if (oldJarPaths == null) {
+            oldJarPaths = new HashSet<String>();
+        }
+        Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, KylinConfig.getInstanceFromEnv());
+        for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) {
+            if (isSame(localCoprocessorFile, fileStatus)) {
+                uploadPath = fileStatus.getPath();
+                break;
+            }
+            String filename = fileStatus.getPath().toString();
+            if (filename.endsWith(".jar")) {
+                oldJarPaths.add(filename);
+            }
+        }
+
+        // upload if not existing
+        if (uploadPath == null) {
+            // figure out a unique new jar file name
+            Set<String> oldJarNames = new HashSet<String>();
+            for (String path : oldJarPaths) {
+                oldJarNames.add(new Path(path).getName());
+            }
+            String baseName = getBaseFileName(localCoprocessorJar);
+            String newName = null;
+            int i = 0;
+            while (newName == null) {
+                newName = baseName + "-" + (i++) + ".jar";
+                if (oldJarNames.contains(newName))
+                    newName = null;
+            }
+
+            // upload
+            uploadPath = new Path(coprocessorDir, newName);
+            FileInputStream in = null;
+            FSDataOutputStream out = null;
+            try {
+                in = new FileInputStream(localCoprocessorFile);
+                out = fileSystem.create(uploadPath);
+                IOUtils.copy(in, out);
+            } finally {
+                IOUtils.closeQuietly(in);
+                IOUtils.closeQuietly(out);
+            }
+
+            fileSystem.setTimes(uploadPath, localCoprocessorFile.lastModified(), -1);
+
+        }
+
+        uploadPath = uploadPath.makeQualified(fileSystem.getUri(), null);
+        return uploadPath;
+    }
+
+    private static boolean isSame(File localCoprocessorFile, FileStatus fileStatus) {
+        return fileStatus.getLen() == localCoprocessorFile.length() && fileStatus.getModificationTime() == localCoprocessorFile.lastModified();
+    }
+
+    private static String getBaseFileName(String localCoprocessorJar) {
+        File localJar = new File(localCoprocessorJar);
+        String baseName = localJar.getName();
+        if (baseName.endsWith(".jar"))
+            baseName = baseName.substring(0, baseName.length() - ".jar".length());
+        return baseName;
+    }
+
+    private static Path getCoprocessorHDFSDir(FileSystem fileSystem, KylinConfig config) throws IOException {
+        String hdfsWorkingDirectory = config.getHdfsWorkingDirectory();
+        Path coprocessorDir = new Path(hdfsWorkingDirectory, "coprocessor");
+        fileSystem.mkdirs(coprocessorDir);
+        return coprocessorDir;
+    }
+
+    private static Set<String> getCoprocessorJarPaths(HBaseAdmin hbaseAdmin, List<String> tableNames) throws IOException {
+        HashSet<String> result = new HashSet<String>();
+
+        for (String tableName : tableNames) {
+            HTableDescriptor tableDescriptor = null;
+            try {
+                tableDescriptor = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
+            } catch (TableNotFoundException e) {
+                logger.warn("Table not found " + tableName, e);
+                continue;
+            }
+
+            Matcher keyMatcher;
+            Matcher valueMatcher;
+            for (Map.Entry<ImmutableBytesWritable, ImmutableBytesWritable> e : tableDescriptor.getValues().entrySet()) {
+                keyMatcher = HConstants.CP_HTD_ATTR_KEY_PATTERN.matcher(Bytes.toString(e.getKey().get()));
+                if (!keyMatcher.matches()) {
+                    continue;
+                }
+                valueMatcher = HConstants.CP_HTD_ATTR_VALUE_PATTERN.matcher(Bytes.toString(e.getValue().get()));
+                if (!valueMatcher.matches()) {
+                    continue;
+                }
+
+                String jarPath = valueMatcher.group(1).trim();
+                String clsName = valueMatcher.group(2).trim();
+
+                if (OBSERVER_CLS_NAME.equals(clsName)) {
+                    result.add(jarPath);
+                }
+            }
+        }
+
+        return result;
+    }
+
+    private static List<String> getHTableNames(KylinConfig config) {
+        CubeManager cubeMgr = CubeManager.getInstance(config);
+
+        ArrayList<String> result = new ArrayList<String>();
+        for (CubeInstance cube : cubeMgr.listAllCubes()) {
+            for (CubeSegment seg : cube.getSegments(SegmentStatusEnum.READY)) {
+                String tableName = seg.getStorageLocationIdentifier();
+                if (StringUtils.isBlank(tableName) == false) {
+                    result.add(tableName);
+                    System.out.println("added new table: " + tableName);
+                }
+            }
+        }
+
+        for (IIInstance ii : IIManager.getInstance(config).listAllIIs()) {
+            if (ii.getStatus() == RealizationStatusEnum.READY) {
+                for (IISegment seg : ii.getSegments()) {//streaming segment is never "READY"
+                    String tableName = seg.getStorageLocationIdentifier();
+                    if (StringUtils.isBlank(tableName) == false) {
+                        result.add(tableName);
+                        System.out.println("added new table: " + tableName);
+                    }
+                }
+            }
+        }
+
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseRegionSizeCalculator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseRegionSizeCalculator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseRegionSizeCalculator.java
new file mode 100644
index 0000000..ba0da00
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseRegionSizeCalculator.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+/** This class will come with HBase 2.0 in package org.apache.hadoop.hbase.util **/
+package org.apache.kylin.storage.hbase.util;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ClusterStatus;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.RegionLoad;
+import org.apache.hadoop.hbase.ServerLoad;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HBaseRegionSizeCalculator {
+
+    private static final Logger logger = LoggerFactory.getLogger(HBaseRegionSizeCalculator.class);
+
+    /**
+     * Maps each region to its size in bytes.
+     **/
+    private final Map<byte[], Long> sizeMap = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
+
+    static final String ENABLE_REGIONSIZECALCULATOR = "hbase.regionsizecalculator.enable";
+
+    /**
+     * Computes size of each region for table and given column families.
+     * */
+    public HBaseRegionSizeCalculator(HTable table) throws IOException {
+        this(table, new HBaseAdmin(table.getConfiguration()));
+    }
+
+    /** Constructor for unit testing */
+    HBaseRegionSizeCalculator(HTable table, HBaseAdmin hBaseAdmin) throws IOException {
+
+        try {
+            if (!enabled(table.getConfiguration())) {
+                logger.info("Region size calculation disabled.");
+                return;
+            }
+
+            logger.info("Calculating region sizes for table \"" + new String(table.getTableName()) + "\".");
+
+            // Get regions for table.
+            Set<HRegionInfo> tableRegionInfos = table.getRegionLocations().keySet();
+            Set<byte[]> tableRegions = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
+
+            for (HRegionInfo regionInfo : tableRegionInfos) {
+                tableRegions.add(regionInfo.getRegionName());
+            }
+
+            ClusterStatus clusterStatus = hBaseAdmin.getClusterStatus();
+            Collection<ServerName> servers = clusterStatus.getServers();
+            final long megaByte = 1024L * 1024L;
+
+            // Iterate all cluster regions, filter regions from our table and
+            // compute their size.
+            for (ServerName serverName : servers) {
+                ServerLoad serverLoad = clusterStatus.getLoad(serverName);
+
+                for (RegionLoad regionLoad : serverLoad.getRegionsLoad().values()) {
+                    byte[] regionId = regionLoad.getName();
+
+                    if (tableRegions.contains(regionId)) {
+
+                        long regionSizeBytes = regionLoad.getStorefileSizeMB() * megaByte;
+                        sizeMap.put(regionId, regionSizeBytes);
+
+                        // logger.info("Region " + regionLoad.getNameAsString()
+                        // + " has size " + regionSizeBytes);
+                    }
+                }
+            }
+        } finally {
+            hBaseAdmin.close();
+        }
+
+    }
+
+    boolean enabled(Configuration configuration) {
+        return configuration.getBoolean(ENABLE_REGIONSIZECALCULATOR, true);
+    }
+
+    /**
+     * Returns size of given region in bytes. Returns 0 if region was not found.
+     **/
+    public long getRegionSize(byte[] regionId) {
+        Long size = sizeMap.get(regionId);
+        if (size == null) {
+            logger.info("Unknown region:" + Arrays.toString(regionId));
+            return 0;
+        } else {
+            return size;
+        }
+    }
+
+    public Map<byte[], Long> getRegionSizeMap() {
+        return Collections.unmodifiableMap(sizeMap);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/LZOSupportnessChecker.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/LZOSupportnessChecker.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/LZOSupportnessChecker.java
new file mode 100644
index 0000000..3645bed
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/LZOSupportnessChecker.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.util;
+
+import org.apache.hadoop.hbase.util.CompressionTest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+
+/**
+ */
+public class LZOSupportnessChecker {
+    private static final Logger log = LoggerFactory.getLogger(LZOSupportnessChecker.class);
+
+    public static boolean getSupportness() {
+        try {
+            File temp = File.createTempFile("test", ".tmp");
+            CompressionTest.main(new String[] { "file://" + temp.getAbsolutePath(), "lzo" });
+        } catch (Exception e) {
+            log.warn("LZO support is disabled. Fail to compress file with lzo: " + e.toString());
+            return false;
+        }
+        return true;
+    }
+
+    public static void main(String[] args) throws Exception {
+        System.out.println("LZO supported by current env? " + getSupportness());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java
new file mode 100644
index 0000000..9985acd
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperJobLock.java
@@ -0,0 +1,84 @@
+package org.apache.kylin.storage.hbase.util;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.job.lock.JobLock;
+import org.apache.kylin.storage.hbase.HBaseConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class ZookeeperJobLock implements JobLock {
+    private Logger logger = LoggerFactory.getLogger(ZookeeperJobLock.class);
+
+    private static final String ZOOKEEPER_LOCK_PATH = "/kylin/job_engine/lock";
+
+    private String scheduleID;
+    private InterProcessMutex sharedLock;
+    private CuratorFramework zkClient;
+
+    @Override
+    public boolean lock() {
+        this.scheduleID = schedulerId();
+        String ZKConnectString = getZKConnectString();
+        if (StringUtils.isEmpty(ZKConnectString)) {
+            throw new IllegalArgumentException("ZOOKEEPER_QUORUM is empty!");
+        }
+
+        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+        this.zkClient = CuratorFrameworkFactory.newClient(ZKConnectString, retryPolicy);
+        this.zkClient.start();
+        this.sharedLock = new InterProcessMutex(zkClient, this.scheduleID);
+        boolean hasLock = false;
+        try {
+            hasLock = sharedLock.acquire(3, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            logger.warn("error acquire lock", e);
+        }
+        if (!hasLock) {
+            logger.warn("fail to acquire lock, scheduler has not been started");
+            zkClient.close();
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public void unlock() {
+        releaseLock();
+    }
+
+    private String getZKConnectString() {
+        Configuration conf = HBaseConnection.newHBaseConfiguration(KylinConfig.getInstanceFromEnv().getStorageUrl());
+        return conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT);
+    }
+
+    private void releaseLock() {
+        try {
+            if (zkClient.getState().equals(CuratorFrameworkState.STARTED)) {
+                // client.setData().forPath(ZOOKEEPER_LOCK_PATH, null);
+                if (zkClient.checkExists().forPath(scheduleID) != null) {
+                    zkClient.delete().guaranteed().deletingChildrenIfNeeded().forPath(scheduleID);
+                }
+            }
+        } catch (Exception e) {
+            logger.error("error release lock:" + scheduleID);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private String schedulerId() {
+        return ZOOKEEPER_LOCK_PATH + "/" + KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/CreateHTableTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/CreateHTableTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/CreateHTableTest.java
new file mode 100644
index 0000000..8b09f05
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/CreateHTableTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase;
+
+import static org.junit.Assert.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.storage.hbase.steps.CreateHTableJob;
+
+/**
+ * @author George Song (ysong1)
+ * 
+ */
+public class CreateHTableTest extends LocalFileMetadataTestCase {
+
+    private Configuration conf;
+
+    @Before
+    public void setup() throws Exception {
+        conf = new Configuration();
+        conf.set("fs.default.name", "file:///");
+        conf.set("mapred.job.tracker", "local");
+        this.createTestMetadata();
+
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void testGetSplits() throws IllegalArgumentException, Exception {
+        CreateHTableJob c = new CreateHTableJob();
+
+        String input = "src/test/resources/partition_list/part-r-00000";
+
+        byte[][] splits = c.getSplits(conf, new Path(input));
+
+        assertEquals(497, splits.length);
+        assertArrayEquals(new byte[] { 0, 0, 0, 0, 0, 0, 15, -1, 11, 51, -45, 2 }, splits[0]);
+        assertArrayEquals(new byte[] { 0, 0, 0, 0, 0, 3, -1, -1, -54, -61, 109, -44, 1 }, splits[496]);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ITHdfsOpsTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ITHdfsOpsTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ITHdfsOpsTest.java
new file mode 100644
index 0000000..5381187
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ITHdfsOpsTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+/**
+ */
+public class ITHdfsOpsTest extends HBaseMetadataTestCase {
+
+    FileSystem fileSystem;
+
+    @Before
+    public void setup() throws Exception {
+
+        this.createTestMetadata();
+
+        Configuration hconf = new Configuration();
+
+        fileSystem = FileSystem.get(hconf);
+    }
+
+    @Test
+    public void TestPath() throws IOException {
+        String hdfsWorkingDirectory = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory();
+        Path coprocessorDir = new Path(hdfsWorkingDirectory, "test");
+        fileSystem.mkdirs(coprocessorDir);
+
+        Path newFile = new Path(coprocessorDir, "test_file");
+        newFile = newFile.makeQualified(fileSystem.getUri(), null);
+        FSDataOutputStream stream = fileSystem.create(newFile);
+        stream.write(new byte[] { 0, 1, 2 });
+        stream.close();
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/RowValueDecoderTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/RowValueDecoderTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/RowValueDecoderTest.java
index dcc5b06..3ff1fc1 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/RowValueDecoderTest.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/RowValueDecoderTest.java
@@ -35,6 +35,7 @@ import org.apache.kylin.metadata.measure.LongMutable;
 import org.apache.kylin.metadata.measure.MeasureCodec;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.storage.hbase.steps.RowValueDecoder;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/TestHbaseClient.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/TestHbaseClient.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/TestHbaseClient.java
new file mode 100644
index 0000000..0ea037b
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/TestHbaseClient.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.kylin.common.util.Bytes;
+
+/**
+ */
+public class TestHbaseClient {
+
+    private static boolean reverse = false;
+
+    public static void foo(int n, int k) {
+        int t = k;
+        if (n - k < k) {
+            t = n - k;
+            reverse = true;
+        }
+        boolean[] flags = new boolean[n];
+        inner(flags, 0, t);
+    }
+
+    private static void print(boolean[] flags) {
+        for (int i = 0; i < flags.length; i++) {
+            if (!reverse) {
+                if (flags[i])
+                    System.out.print("0");
+                else
+                    System.out.print("1");
+            } else {
+                if (flags[i])
+                    System.out.print("1");
+                else
+                    System.out.print("0");
+
+            }
+        }
+        System.out.println();
+
+    }
+
+    private static void inner(boolean[] flags, int start, int remaining) {
+        if (remaining <= 0) {
+            print(flags);
+            return;
+        }
+
+        if (flags.length - start < remaining) {
+            return;
+        }
+
+        // write at flags[start]
+        flags[start] = true;
+        inner(flags, start + 1, remaining - 1);
+
+        // not write at flags[start]
+        flags[start] = false;
+        inner(flags, start + 1, remaining);
+    }
+
+    public static void main(String[] args) throws IOException {
+        foo(6, 5);
+        foo(5, 2);
+        foo(3, 0);
+
+        Configuration conf = HBaseConfiguration.create();
+        conf.set("hbase.zookeeper.quorum", "hbase_host");
+        conf.set("zookeeper.znode.parent", "/hbase-unsecure");
+
+        HTable table = new HTable(conf, "test1");
+        Put put = new Put(Bytes.toBytes("row1"));
+
+        put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"), Bytes.toBytes("val1"));
+        put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual2"), Bytes.toBytes("val2"));
+
+        table.put(put);
+        table.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper2Test.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper2Test.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper2Test.java
new file mode 100644
index 0000000..733de1d
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper2Test.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.steps;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author yangli9
+ * 
+ */
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class CubeHFileMapper2Test extends LocalFileMetadataTestCase {
+
+    String cubeName = "test_kylin_cube_with_slr_ready";
+
+    MeasureCodec codec;
+    ByteBuffer buf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+    Object[] outKV = new Object[2];
+
+    @Before
+    public void setup() throws Exception {
+        this.createTestMetadata();
+        // hack for distributed cache
+        FileUtils.deleteDirectory(new File("../job/meta"));
+        FileUtils.copyDirectory(new File(getTestConfig().getMetadataUrl()), new File("../job/meta"));
+        CubeDesc desc = CubeManager.getInstance(getTestConfig()).getCube(cubeName).getDescriptor();
+        codec = new MeasureCodec(desc.getMeasures());
+    }
+
+    @After
+    public void after() throws Exception {
+        cleanupTestMetadata();
+        FileUtils.deleteDirectory(new File("../job/meta"));
+    }
+
+    @Test
+    public void testBasic() throws Exception {
+
+        Configuration hconf = new Configuration();
+        Context context = MockupMapContext.create(hconf, getTestConfig().getMetadataUrl(), cubeName, outKV);
+
+        CubeHFileMapper mapper = new CubeHFileMapper();
+        mapper.setup(context);
+
+        Text key = new Text("not important");
+        Text value = new Text(new byte[] { 2, 2, 51, -79, 1 });
+
+        mapper.map(key, value, context);
+
+        ImmutableBytesWritable outKey = (ImmutableBytesWritable) outKV[0];
+        KeyValue outValue = (KeyValue) outKV[1];
+
+        assertTrue(Bytes.compareTo(key.getBytes(), 0, key.getLength(), outKey.get(), outKey.getOffset(), outKey.getLength()) == 0);
+
+        assertTrue(Bytes.compareTo(value.getBytes(), 0, value.getLength(), outValue.getValueArray(), outValue.getValueOffset(), outValue.getValueLength()) == 0);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapperTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapperTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapperTest.java
new file mode 100644
index 0000000..0c148e7
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapperTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.steps;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mrunit.mapreduce.MapDriver;
+import org.apache.hadoop.mrunit.types.Pair;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.storage.hbase.steps.CubeHFileMapper;
+
+/**
+ * @author George Song (ysong1)
+ * 
+ */
+public class CubeHFileMapperTest {
+
+    MapDriver<Text, Text, ImmutableBytesWritable, KeyValue> mapDriver;
+
+    private String cube_name = "FLAT_ITEM_CUBE";
+
+    @Before
+    public void setUp() {
+        CubeHFileMapper mapper = new CubeHFileMapper();
+        mapDriver = MapDriver.newMapDriver(mapper);
+    }
+
+    @SuppressWarnings("deprecation")
+    @Test
+    @Ignore("not maintaining")
+    public void testMapper2() throws IOException {
+        mapDriver.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cube_name);
+
+        mapDriver.addInput(new Text("52010tech"), new Text("35.432"));
+
+        List<Pair<ImmutableBytesWritable, KeyValue>> result = mapDriver.run();
+
+        assertEquals(2, result.size());
+
+        byte[] bytes = { 0, 0, 0, 0, 0, 0, 0, 119, 33, 0, 22, 1, 0, 121, 7 };
+        ImmutableBytesWritable key = new ImmutableBytesWritable(bytes);
+
+        Pair<ImmutableBytesWritable, KeyValue> p1 = result.get(0);
+        Pair<ImmutableBytesWritable, KeyValue> p2 = result.get(1);
+
+        assertEquals(key, p1.getFirst());
+        assertEquals("cf1", new String(p1.getSecond().getFamily()));
+        assertEquals("usd_amt", new String(p1.getSecond().getQualifier()));
+        assertEquals("35.43", new String(p1.getSecond().getValue()));
+
+        assertEquals(key, p2.getFirst());
+        assertEquals("cf1", new String(p2.getSecond().getFamily()));
+        assertEquals("item_count", new String(p2.getSecond().getQualifier()));
+        assertEquals("2", new String(p2.getSecond().getValue()));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/MockupMapContext.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/MockupMapContext.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/MockupMapContext.java
new file mode 100644
index 0000000..d5c3f60
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/MockupMapContext.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.steps;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configuration.IntegerRanges;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
+import org.apache.hadoop.security.Credentials;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+
+/**
+ * @author yangli9
+ * 
+ */
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class MockupMapContext {
+
+    public static Context create(final Configuration hconf, String metadataUrl, String cubeName, final Object[] outKV) {
+
+        hconf.set(BatchConstants.CFG_CUBE_NAME, cubeName);
+
+        return new WrappedMapper().getMapContext(new MapContext() {
+
+            @Override
+            public boolean nextKeyValue() throws IOException, InterruptedException {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public Object getCurrentKey() throws IOException, InterruptedException {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public Object getCurrentValue() throws IOException, InterruptedException {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public void write(Object key, Object value) throws IOException, InterruptedException {
+                System.out.println("Write -- k:" + key + ", v:" + value);
+                if (outKV != null) {
+                    outKV[0] = key;
+                    outKV[1] = value;
+                }
+            }
+
+            @Override
+            public OutputCommitter getOutputCommitter() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public TaskAttemptID getTaskAttemptID() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public void setStatus(String msg) {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public String getStatus() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public float getProgress() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public Counter getCounter(Enum<?> counterName) {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public Counter getCounter(String groupName, String counterName) {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public Configuration getConfiguration() {
+                return hconf;
+            }
+
+            @Override
+            public Credentials getCredentials() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public JobID getJobID() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public int getNumReduceTasks() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public Path getWorkingDirectory() throws IOException {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public Class<?> getOutputKeyClass() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public Class<?> getOutputValueClass() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public Class<?> getMapOutputKeyClass() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public Class<?> getMapOutputValueClass() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public String getJobName() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public Class<? extends InputFormat<?, ?>> getInputFormatClass() throws ClassNotFoundException {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public Class<? extends Mapper<?, ?, ?, ?>> getMapperClass() throws ClassNotFoundException {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public Class<? extends Reducer<?, ?, ?, ?>> getCombinerClass() throws ClassNotFoundException {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public Class<? extends Reducer<?, ?, ?, ?>> getReducerClass() throws ClassNotFoundException {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public Class<? extends OutputFormat<?, ?>> getOutputFormatClass() throws ClassNotFoundException {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public Class<? extends Partitioner<?, ?>> getPartitionerClass() throws ClassNotFoundException {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public RawComparator<?> getSortComparator() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public String getJar() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public RawComparator<?> getGroupingComparator() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public boolean getJobSetupCleanupNeeded() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public boolean getTaskCleanupNeeded() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public boolean getProfileEnabled() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public String getProfileParams() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public IntegerRanges getProfileTaskRange(boolean isMap) {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public String getUser() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public boolean getSymlink() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public Path[] getArchiveClassPaths() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public URI[] getCacheArchives() throws IOException {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public URI[] getCacheFiles() throws IOException {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public Path[] getLocalCacheArchives() throws IOException {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public Path[] getLocalCacheFiles() throws IOException {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public Path[] getFileClassPaths() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public String[] getArchiveTimestamps() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public String[] getFileTimestamps() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public int getMaxMapAttempts() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public int getMaxReduceAttempts() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public void progress() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public InputSplit getInputSplit() {
+                throw new NotImplementedException();
+            }
+
+            @Override
+            public RawComparator<?> getCombinerKeyGroupingComparator() {
+                throw new NotImplementedException();
+            }
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJobTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJobTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJobTest.java
new file mode 100644
index 0000000..78ea71f
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJobTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.steps;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.engine.mr.steps.RangeKeyDistributionJob;
+
+/**
+ * @author ysong1
+ * 
+ */
+public class RangeKeyDistributionJobTest extends LocalFileMetadataTestCase {
+
+    private Configuration conf;
+
+    @Before
+    public void setup() throws Exception {
+        conf = new Configuration();
+        conf.set("fs.default.name", "file:///");
+        conf.set("mapred.job.tracker", "local");
+
+        // for local runner out-of-memory issue
+        conf.set("mapreduce.task.io.sort.mb", "10");
+        createTestMetadata();
+    }
+
+    @After
+    public void after() throws Exception {
+        cleanupTestMetadata();
+    }
+
+    @Test
+    public void testJob() throws Exception {
+        String input = "src/test/resources/data/base_cuboid/,src/test/resources/data/6d_cuboid/";
+        String output = "target/test-output/key_distribution_range/";
+        String jobname = "calculate_splits";
+        String cubename = "test_kylin_cube_with_slr_ready";
+
+        FileUtil.fullyDelete(new File(output));
+
+        String[] args = { "-input", input, "-output", output, "-jobname", jobname, "-cubename", cubename };
+        assertEquals("Job failed", 0, ToolRunner.run(conf, new RangeKeyDistributionJob(), args));
+    }
+
+}