You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/03/04 03:02:43 UTC

[02/43] kylin git commit: KYLIN-1323 Improve performance of converting data to hfile

KYLIN-1323 Improve performance of converting data to hfile


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/ab4d8909
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/ab4d8909
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/ab4d8909

Branch: refs/heads/helix-rebase
Commit: ab4d8909ac85af87d8b8b443044c49a79f9e3ee4
Parents: 66294d3
Author: sunyerui <su...@gmail.com>
Authored: Sun Feb 28 21:02:25 2016 +0800
Committer: sunyerui <su...@gmail.com>
Committed: Sun Feb 28 21:12:28 2016 +0800

----------------------------------------------------------------------
 build/conf/kylin.properties                     |   4 +
 .../apache/kylin/common/KylinConfigBase.java    |   8 ++
 .../engine/mr/common/AbstractHadoopJob.java     |   2 +-
 .../kylin/engine/mr/common/BatchConstants.java  |   1 +
 .../mr/steps/RangeKeyDistributionJob.java       | 115 ----------------
 .../mr/steps/RangeKeyDistributionMapper.java    |  71 ----------
 .../mr/steps/RangeKeyDistributionReducer.java   | 100 --------------
 kylin-it/pom.xml                                |   3 +
 .../kylin/provision/BuildCubeWithEngine.java    |  41 ++++++
 .../kylin/storage/hbase/steps/CubeHFileJob.java |  37 ++++-
 .../kylin/storage/hbase/steps/HBaseMRSteps.java |   4 +-
 .../hbase/steps/RangeKeyDistributionJob.java    | 127 +++++++++++++++++
 .../hbase/steps/RangeKeyDistributionMapper.java |  76 +++++++++++
 .../steps/RangeKeyDistributionReducer.java      | 136 +++++++++++++++++++
 .../hbase/util/HBaseRegionSizeCalculator.java   |   8 ++
 .../steps/RangeKeyDistributionJobTest.java      |   1 -
 .../steps/RangeKeyDistributionMapperTest.java   |   1 -
 .../steps/RangeKeyDistributionReducerTest.java  |   1 -
 18 files changed, 443 insertions(+), 293 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/ab4d8909/build/conf/kylin.properties
----------------------------------------------------------------------
diff --git a/build/conf/kylin.properties b/build/conf/kylin.properties
index 44a282e..b220b2d 100644
--- a/build/conf/kylin.properties
+++ b/build/conf/kylin.properties
@@ -65,6 +65,10 @@ kylin.hbase.region.cut.small=5
 kylin.hbase.region.cut.medium=10
 kylin.hbase.region.cut.large=50
 
+# The hfile size of GB, smaller hfile leading to the converting hfile MR has more reducers and be faster
+# set to 0 or comment this config to disable this optimization
+kylin.hbase.hfile.size.gb=5
+
 # Enable/disable ACL check for cube query
 kylin.query.security.enabled=true
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/ab4d8909/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 7707684..3430e0b 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -407,6 +407,14 @@ public class KylinConfigBase implements Serializable {
         return Integer.parseInt(getOptional("kylin.hbase.region.count.max", "500"));
     }
 
+    public void setHBaseHFileSizeGB(int size) {
+        setProperty("kylin.hbase.hfile.size.gb", String.valueOf(size));
+    }
+
+    public int getHBaseHFileSizeGB() {
+        return Integer.parseInt(getOptional("kylin.hbase.hfile.size.gb", "0"));
+    }
+
     public int getScanThreshold() {
         return Integer.parseInt(getOptional("kylin.query.scan.threshold", "10000000"));
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/ab4d8909/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index 7615269..e4eee96 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -83,7 +83,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
     protected static final Option OPTION_INPUT_FORMAT = OptionBuilder.withArgName("inputformat").hasArg().isRequired(false).withDescription("Input format").create("inputformat");
     protected static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Output path").create("output");
     protected static final Option OPTION_NCUBOID_LEVEL = OptionBuilder.withArgName("level").hasArg().isRequired(true).withDescription("N-Cuboid build level, e.g. 1, 2, 3...").create("level");
-    protected static final Option OPTION_PARTITION_FILE_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Partition file path.").create("input");
+    protected static final Option OPTION_PARTITION_FILE_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Partition file path.").create("partitions");
     protected static final Option OPTION_HTABLE_NAME = OptionBuilder.withArgName("htable name").hasArg().isRequired(true).withDescription("HTable name").create("htablename");
 
     protected static final Option OPTION_STATISTICS_ENABLED = OptionBuilder.withArgName("statisticsenabled").hasArg().isRequired(false).withDescription("Statistics enabled").create("statisticsenabled");

http://git-wip-us.apache.org/repos/asf/kylin/blob/ab4d8909/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index 400a3aa..6943f18 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -42,6 +42,7 @@ public interface BatchConstants {
     String REGION_NUMBER_MIN = "region.number.min";
     String REGION_NUMBER_MAX = "region.number.max";
     String REGION_SPLIT_SIZE = "region.split.size";
+    String HFILE_SIZE_GB = "hfile.size.gb";
     
     String CFG_KYLIN_LOCAL_TEMP_DIR = "/tmp/kylin/";
     String CFG_KYLIN_HDFS_TEMP_DIR = "/tmp/kylin/";

http://git-wip-us.apache.org/repos/asf/kylin/blob/ab4d8909/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RangeKeyDistributionJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RangeKeyDistributionJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RangeKeyDistributionJob.java
deleted file mode 100644
index 5632fc1..0000000
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RangeKeyDistributionJob.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.steps;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.metadata.model.DataModelDesc;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author xjiang, ysong1
- * 
- */
-
-public class RangeKeyDistributionJob extends AbstractHadoopJob {
-    protected static final Logger logger = LoggerFactory.getLogger(RangeKeyDistributionJob.class);
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.hadoop.util.Tool#run(java.lang.String[])
-     */
-    @Override
-    public int run(String[] args) throws Exception {
-        Options options = new Options();
-
-        try {
-            options.addOption(OPTION_INPUT_PATH);
-            options.addOption(OPTION_OUTPUT_PATH);
-            options.addOption(OPTION_JOB_NAME);
-            options.addOption(OPTION_CUBE_NAME);
-
-            parseOptions(options, args);
-
-            // start job
-            String jobName = getOptionValue(OPTION_JOB_NAME);
-            job = Job.getInstance(getConf(), jobName);
-
-            setJobClasspath(job);
-
-            addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
-
-            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
-            FileOutputFormat.setOutputPath(job, output);
-            // job.getConfiguration().set("dfs.block.size", "67108864");
-
-            // Mapper
-            job.setInputFormatClass(SequenceFileInputFormat.class);
-            job.setMapperClass(RangeKeyDistributionMapper.class);
-            job.setMapOutputKeyClass(Text.class);
-            job.setMapOutputValueClass(LongWritable.class);
-
-            // Reducer - only one
-            job.setReducerClass(RangeKeyDistributionReducer.class);
-            job.setOutputFormatClass(SequenceFileOutputFormat.class);
-            job.setOutputKeyClass(Text.class);
-            job.setOutputValueClass(LongWritable.class);
-            job.setNumReduceTasks(1);
-
-            this.deletePath(job.getConfiguration(), output);
-
-            String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
-            CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
-            CubeInstance cube = cubeMgr.getCube(cubeName);
-            DataModelDesc.RealizationCapacity cubeCapacity = cube.getDescriptor().getModel().getCapacity();
-            int regionSplitSize = KylinConfig.getInstanceFromEnv().getHBaseRegionCut(cubeCapacity.toString());
-            int maxRegionCount = KylinConfig.getInstanceFromEnv().getHBaseRegionCountMax();
-            int minRegionCount = KylinConfig.getInstanceFromEnv().getHBaseRegionCountMin();
-            
-            job.getConfiguration().set(BatchConstants.REGION_SPLIT_SIZE, String.valueOf(regionSplitSize));
-            job.getConfiguration().set(BatchConstants.REGION_NUMBER_MAX, String.valueOf(maxRegionCount));
-            job.getConfiguration().set(BatchConstants.REGION_NUMBER_MIN, String.valueOf(minRegionCount));
-            
-            return waitForCompletion(job);
-        } catch (Exception e) {
-            printUsage(options);
-            throw e;
-        }
-    }
-
-    public static void main(String[] args) throws Exception {
-        int exitCode = ToolRunner.run(new RangeKeyDistributionJob(), args);
-        System.exit(exitCode);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/ab4d8909/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RangeKeyDistributionMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RangeKeyDistributionMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RangeKeyDistributionMapper.java
deleted file mode 100644
index 47cbc95..0000000
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RangeKeyDistributionMapper.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.steps;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.kylin.engine.mr.KylinMapper;
-
-/**
- * @author ysong1
- * 
- */
-public class RangeKeyDistributionMapper extends KylinMapper<Text, Text, Text, LongWritable> {
-
-    private static final long ONE_MEGA_BYTES = 1L * 1024L * 1024L;
-
-    private LongWritable outputValue = new LongWritable(0);
-
-    private long bytesRead = 0;
-
-    private Text lastKey;
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        super.bindCurrentConfiguration(context.getConfiguration());
-    }
-
-    @Override
-    public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
-        lastKey = key;
-
-        int bytesLength = key.getLength() + value.getLength();
-        bytesRead += bytesLength;
-
-        if (bytesRead >= ONE_MEGA_BYTES) {
-            outputValue.set(bytesRead);
-            context.write(key, outputValue);
-
-            // reset bytesRead
-            bytesRead = 0;
-        }
-
-    }
-
-    @Override
-    protected void cleanup(Context context) throws IOException, InterruptedException {
-        if (lastKey != null) {
-            outputValue.set(bytesRead);
-            context.write(lastKey, outputValue);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/ab4d8909/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RangeKeyDistributionReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RangeKeyDistributionReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RangeKeyDistributionReducer.java
deleted file mode 100644
index 68be74e..0000000
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RangeKeyDistributionReducer.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.steps;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.kylin.engine.mr.KylinReducer;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author ysong1
- * 
- */
-public class RangeKeyDistributionReducer extends KylinReducer<Text, LongWritable, Text, LongWritable> {
-
-    public static final long ONE_GIGA_BYTES = 1024L * 1024L * 1024L;
-    private static final Logger logger = LoggerFactory.getLogger(RangeKeyDistributionReducer.class);
-
-    private LongWritable outputValue = new LongWritable(0);
-
-    private int minRegionCount = 1;
-    private int maxRegionCount = 500;
-    private int cut = 10;
-    private long bytesRead = 0;
-    private List<Text> gbPoints = new ArrayList<Text>();
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        super.bindCurrentConfiguration(context.getConfiguration());
-
-        if (context.getConfiguration().get(BatchConstants.REGION_SPLIT_SIZE) != null) {
-            cut = Integer.valueOf(context.getConfiguration().get(BatchConstants.REGION_SPLIT_SIZE));
-        }
-
-        if (context.getConfiguration().get(BatchConstants.REGION_NUMBER_MIN) != null) {
-            minRegionCount = Integer.valueOf(context.getConfiguration().get(BatchConstants.REGION_NUMBER_MIN));
-        }
-
-        if (context.getConfiguration().get(BatchConstants.REGION_NUMBER_MAX) != null) {
-            maxRegionCount = Integer.valueOf(context.getConfiguration().get(BatchConstants.REGION_NUMBER_MAX));
-        }
-
-        logger.info("Chosen cut for htable is " + cut + ", max region count=" + maxRegionCount + ", min region count =" + minRegionCount);
-    }
-
-    @Override
-    public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
-        for (LongWritable v : values) {
-            bytesRead += v.get();
-        }
-
-        if (bytesRead >= ONE_GIGA_BYTES) {
-            gbPoints.add(new Text(key));
-            bytesRead = 0; // reset bytesRead
-        }
-    }
-
-    @Override
-    protected void cleanup(Context context) throws IOException, InterruptedException {
-        int nRegion = Math.round((float) gbPoints.size() / (float) cut);
-        nRegion = Math.max(minRegionCount, nRegion);
-        nRegion = Math.min(maxRegionCount, nRegion);
-
-        int gbPerRegion = gbPoints.size() / nRegion;
-        gbPerRegion = Math.max(1, gbPerRegion);
-
-        System.out.println(nRegion + " regions");
-        System.out.println(gbPerRegion + " GB per region");
-
-        for (int i = gbPerRegion; i < gbPoints.size(); i += gbPerRegion) {
-            Text key = gbPoints.get(i);
-            outputValue.set(i);
-            System.out.println(StringUtils.byteToHexString(key.getBytes()) + "\t" + outputValue.get());
-            context.write(key, outputValue);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/ab4d8909/kylin-it/pom.xml
----------------------------------------------------------------------
diff --git a/kylin-it/pom.xml b/kylin-it/pom.xml
index 6cb44a5..99b650c 100644
--- a/kylin-it/pom.xml
+++ b/kylin-it/pom.xml
@@ -301,6 +301,7 @@
                                     <classpathScope>test</classpathScope>
                                     <executable>java</executable>
                                     <arguments>
+                                        <argument>-DuseSandbox=true</argument>
                                         <argument>-Dhdp.version=${hdp.version}</argument>
                                         <argument>-DfastBuildMode=${fastBuildMode}</argument>
                                         <argument>-classpath</argument>
@@ -321,6 +322,7 @@
                                     <classpathScope>test</classpathScope>
                                     <executable>java</executable>
                                     <arguments>
+                                        <argument>-DuseSandbox=true</argument>
                                         <argument>-Dhdp.version=${hdp.version}</argument>
                                         <argument>-DfastBuildMode=${fastBuildMode}</argument>
                                         <argument>-classpath</argument>
@@ -341,6 +343,7 @@
                                     <classpathScope>test</classpathScope>
                                     <executable>java</executable>
                                     <arguments>
+                                        <argument>-DuseSandbox=true</argument>
                                         <argument>-Dhdp.version=${hdp.version}</argument>
                                         <argument>-DfastBuildMode=${fastBuildMode}</argument>
                                         <argument>-classpath</argument>

http://git-wip-us.apache.org/repos/asf/kylin/blob/ab4d8909/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
index 28808df..cfefef3 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.lang.reflect.Method;
 import java.text.SimpleDateFormat;
 import java.util.List;
+import java.util.Map;
 import java.util.TimeZone;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
@@ -36,11 +37,14 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.AbstractKylinTestCase;
 import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
@@ -55,6 +59,8 @@ import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
 import org.apache.kylin.job.manager.ExecutableManager;
+import org.apache.kylin.metadata.model.IEngineAware;
+import org.apache.kylin.storage.hbase.util.HBaseRegionSizeCalculator;
 import org.apache.kylin.storage.hbase.util.StorageCleanupJob;
 import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
 
@@ -143,8 +149,10 @@ public class BuildCubeWithEngine {
 
     public void build() throws Exception {
         DeployUtil.prepareTestDataForNormalCubes("test_kylin_cube_with_slr_left_join_empty");
+        KylinConfig.getInstanceFromEnv().setHBaseHFileSizeGB(1);
         testInner();
         testLeft();
+        KylinConfig.getInstanceFromEnv().setHBaseHFileSizeGB(0);
     }
 
     protected void waitForJob(String jobId) {
@@ -345,6 +353,9 @@ public class BuildCubeWithEngine {
         DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST");
         jobService.addJob(job);
         waitForJob(job.getId());
+        if (segment.getCubeDesc().getEngineType() == IEngineAware.ID_MR_V1) {
+            checkHFilesInHBase(segment);
+        }
         return job.getId();
     }
 
@@ -355,4 +366,34 @@ public class BuildCubeWithEngine {
         return exitCode;
     }
 
+    private void checkHFilesInHBase(CubeSegment segment) throws IOException {
+        Configuration conf = HBaseConfiguration.create(HadoopUtil.getCurrentConfiguration());
+        String tableName = segment.getStorageLocationIdentifier();
+        HTable table = new HTable(conf, tableName);
+        HBaseRegionSizeCalculator cal = new HBaseRegionSizeCalculator(table);
+        Map<byte[], Long> sizeMap = cal.getRegionSizeMap();
+        long totalSize = 0;
+        for (Long size : sizeMap.values()) {
+            totalSize += size;
+        }
+        if (totalSize == 0) {
+            return;
+        }
+        Map<byte[], Pair<Integer, Integer>> countMap = cal.getRegionHFileCountMap();
+        // check if there's region contains more than one hfile, which means the hfile config take effects
+        boolean hasMultiHFileRegions = false;
+        for (Pair<Integer, Integer> count : countMap.values()) {
+            // check if hfile count is greater than store count
+            if (count.getSecond() > count.getFirst()) {
+                hasMultiHFileRegions = true;
+                break;
+            }
+        }
+        if (KylinConfig.getInstanceFromEnv().getHBaseHFileSizeGB() == 0 && hasMultiHFileRegions) {
+            throw new IOException("hfile size set to 0, but found region contains more than one hfiles");
+        } else if (KylinConfig.getInstanceFromEnv().getHBaseHFileSizeGB() > 0 && !hasMultiHFileRegions) {
+            throw new IOException("hfile size set greater than 0, but all regions still has only one hfile");
+        }
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/ab4d8909/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
index 1f0b1a0..a302daf 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
@@ -18,17 +18,24 @@
 
 package org.apache.kylin.storage.hbase.steps;
 
+import java.io.IOException;
+
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
 import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeInstance;
@@ -51,11 +58,14 @@ public class CubeHFileJob extends AbstractHadoopJob {
         try {
             options.addOption(OPTION_JOB_NAME);
             options.addOption(OPTION_CUBE_NAME);
+            options.addOption(OPTION_PARTITION_FILE_PATH);
             options.addOption(OPTION_INPUT_PATH);
             options.addOption(OPTION_OUTPUT_PATH);
             options.addOption(OPTION_HTABLE_NAME);
             parseOptions(options, args);
 
+            Path partitionFilePath = new Path(getOptionValue(OPTION_PARTITION_FILE_PATH));
+
             Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
             String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
 
@@ -82,8 +92,9 @@ public class CubeHFileJob extends AbstractHadoopJob {
             String tableName = getOptionValue(OPTION_HTABLE_NAME).toUpperCase();
             HTable htable = new HTable(conf, tableName);
 
-            //Automatic config !
+            // Automatic config !
             HFileOutputFormat.configureIncrementalLoad(job, htable);
+            reconfigurePartitions(conf, partitionFilePath);
 
             // set block replication to 3 for hfiles
             conf.set(DFSConfigKeys.DFS_REPLICATION_KEY, "3");
@@ -101,6 +112,30 @@ public class CubeHFileJob extends AbstractHadoopJob {
         }
     }
 
+    /**
+     * Check if there's partition files for hfile, if yes replace the table splits, to make the job more reducers
+     * @param conf the job configuration
+     * @param path the hfile partition file
+     * @throws IOException
+     */
+    @SuppressWarnings("deprecation")
+    private void reconfigurePartitions(Configuration conf, Path path) throws IOException {
+        FileSystem fs = path.getFileSystem(conf);
+        if (fs.exists(path)) {
+            try (SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf)) {
+                int partitionCount = 0;
+                Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+                Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+                while (reader.next(key, value)) {
+                    partitionCount++;
+                }
+                TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), path);
+                // The reduce tasks should be one more than partition keys
+                job.setNumReduceTasks(partitionCount+1);
+            }
+        }
+    }
+
     public static void main(String[] args) throws Exception {
         int exitCode = ToolRunner.run(new CubeHFileJob(), args);
         System.exit(exitCode);

http://git-wip-us.apache.org/repos/asf/kylin/blob/ab4d8909/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
index c3bd7b5..2a21640 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
@@ -12,7 +12,6 @@ import org.apache.kylin.engine.mr.common.MapReduceExecutable;
 import org.apache.kylin.storage.hbase.ii.IIBulkLoadJob;
 import org.apache.kylin.storage.hbase.ii.IICreateHFileJob;
 import org.apache.kylin.storage.hbase.ii.IICreateHTableJob;
-import org.apache.kylin.engine.mr.steps.RangeKeyDistributionJob;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.metadata.realization.IRealizationSegment;
@@ -72,7 +71,7 @@ public class HBaseMRSteps extends JobBuilderSupport {
         StringBuilder cmd = new StringBuilder();
         appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
         appendExecCmdParameters(cmd, "segmentname", seg.getName());
-        appendExecCmdParameters(cmd, "input", getRowkeyDistributionOutputPath(jobId) + "/part-r-00000");
+        appendExecCmdParameters(cmd, "partitions", getRowkeyDistributionOutputPath(jobId) + "/part-r-00000");
         appendExecCmdParameters(cmd, "statisticsenabled", String.valueOf(withStats));
 
         createHtableStep.setJobParams(cmd.toString());
@@ -90,6 +89,7 @@ public class HBaseMRSteps extends JobBuilderSupport {
 
         appendMapReduceParameters(cmd, seg.getRealization().getDataModelDesc());
         appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
+        appendExecCmdParameters(cmd, "partitions", getRowkeyDistributionOutputPath(jobId) + "/part-r-00000_hfile");
         appendExecCmdParameters(cmd, "input", inputPath);
         appendExecCmdParameters(cmd, "output", getHFilePath(jobId));
         appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());

http://git-wip-us.apache.org/repos/asf/kylin/blob/ab4d8909/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJob.java
new file mode 100644
index 0000000..2ff7356
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJob.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.steps;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author xjiang, ysong1
+ * 
+ */
+
+public class RangeKeyDistributionJob extends AbstractHadoopJob {
+    protected static final Logger logger = LoggerFactory.getLogger(RangeKeyDistributionJob.class);
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.hadoop.util.Tool#run(java.lang.String[])
+     */
+    @Override
+    public int run(String[] args) throws Exception {
+        Options options = new Options();
+
+        try {
+            options.addOption(OPTION_INPUT_PATH);
+            options.addOption(OPTION_OUTPUT_PATH);
+            options.addOption(OPTION_JOB_NAME);
+            options.addOption(OPTION_CUBE_NAME);
+
+            parseOptions(options, args);
+
+            // start job
+            String jobName = getOptionValue(OPTION_JOB_NAME);
+            job = Job.getInstance(getConf(), jobName);
+
+            setJobClasspath(job);
+
+            addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
+
+            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+            FileOutputFormat.setOutputPath(job, output);
+            // job.getConfiguration().set("dfs.block.size", "67108864");
+
+            // Mapper
+            job.setInputFormatClass(SequenceFileInputFormat.class);
+            job.setMapperClass(RangeKeyDistributionMapper.class);
+            job.setMapOutputKeyClass(Text.class);
+            job.setMapOutputValueClass(LongWritable.class);
+
+            // Reducer - only one
+            job.setReducerClass(RangeKeyDistributionReducer.class);
+            job.setOutputFormatClass(SequenceFileOutputFormat.class);
+            job.setOutputKeyClass(Text.class);
+            job.setOutputValueClass(LongWritable.class);
+            job.setNumReduceTasks(1);
+
+            this.deletePath(job.getConfiguration(), output);
+
+            String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
+            CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+            CubeInstance cube = cubeMgr.getCube(cubeName);
+            int hfileSizeGB = KylinConfig.getInstanceFromEnv().getHBaseHFileSizeGB();
+            DataModelDesc.RealizationCapacity cubeCapacity = cube.getDescriptor().getModel().getCapacity();
+            int regionSplitSize = KylinConfig.getInstanceFromEnv().getHBaseRegionCut(cubeCapacity.toString());
+            int maxRegionCount = KylinConfig.getInstanceFromEnv().getHBaseRegionCountMax();
+            int minRegionCount = KylinConfig.getInstanceFromEnv().getHBaseRegionCountMin();
+            job.getConfiguration().set(BatchConstants.OUTPUT_PATH, output.toString());
+            job.getConfiguration().set(BatchConstants.HFILE_SIZE_GB, String.valueOf(hfileSizeGB));
+            job.getConfiguration().set(BatchConstants.REGION_SPLIT_SIZE, String.valueOf(regionSplitSize));
+            job.getConfiguration().set(BatchConstants.REGION_NUMBER_MAX, String.valueOf(maxRegionCount));
+            job.getConfiguration().set(BatchConstants.REGION_NUMBER_MIN, String.valueOf(minRegionCount));
+            // The partition file for hfile is sequenece file consists of ImmutableBytesWritable and NullWritable
+            TableMapReduceUtil.addDependencyJars(job.getConfiguration(), ImmutableBytesWritable.class, NullWritable.class);
+
+            // Passed the sandbox property to mapper, to simulate large dataset
+            if (System.getProperty("useSandbox") != null && System.getProperty("useSandbox").equals("true")) {
+                job.getConfiguration().setBoolean("useSandbox", true);
+            }
+            
+            return waitForCompletion(job);
+        } catch (Exception e) {
+            printUsage(options);
+            throw e;
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        int exitCode = ToolRunner.run(new RangeKeyDistributionJob(), args);
+        System.exit(exitCode);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/ab4d8909/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapper.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapper.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapper.java
new file mode 100644
index 0000000..6f2d2bc
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapper.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.steps;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.engine.mr.KylinMapper;
+
+/**
+ * @author ysong1
+ * 
+ */
+public class RangeKeyDistributionMapper extends KylinMapper<Text, Text, Text, LongWritable> {
+
+    private static final long ONE_MEGA_BYTES = 1L * 1024L * 1024L;
+
+    private LongWritable outputValue = new LongWritable(0);
+
+    private long bytesRead = 0;
+
+    private Text lastKey;
+
+    private Long scaleFactorForSandbox = 1L;
+
+    @Override
+    protected void setup(Context context) throws IOException {
+        super.bindCurrentConfiguration(context.getConfiguration());
+        if (context.getConfiguration().getBoolean("useSandbox", false)) {
+            scaleFactorForSandbox = 1024L;
+        }
+    }
+
+    @Override
+    public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
+        lastKey = key;
+
+        int bytesLength = key.getLength() + value.getLength();
+        bytesRead += bytesLength;
+
+        if ((bytesRead * scaleFactorForSandbox) >= ONE_MEGA_BYTES) {
+            outputValue.set(bytesRead * scaleFactorForSandbox);
+            context.write(key, outputValue);
+
+            // reset bytesRead
+            bytesRead = 0;
+        }
+
+    }
+
+    @Override
+    protected void cleanup(Context context) throws IOException, InterruptedException {
+        if (lastKey != null) {
+            outputValue.set(bytesRead);
+            context.write(lastKey, outputValue);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/ab4d8909/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java
new file mode 100644
index 0000000..acdab62
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.steps;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.kylin.engine.mr.KylinReducer;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author ysong1
+ * 
+ */
+public class RangeKeyDistributionReducer extends KylinReducer<Text, LongWritable, Text, LongWritable> {
+
+    public static final long ONE_GIGA_BYTES = 1024L * 1024L * 1024L;
+    private static final Logger logger = LoggerFactory.getLogger(RangeKeyDistributionReducer.class);
+
+    private LongWritable outputValue = new LongWritable(0);
+
+    private int minRegionCount = 1;
+    private int maxRegionCount = 500;
+    private int cut = 10;
+    private int hfileSizeGB = 1;
+    private long bytesRead = 0;
+    private List<Text> gbPoints = new ArrayList<Text>();
+    private String output = null;
+
+    @Override
+    protected void setup(Context context) throws IOException {
+        super.bindCurrentConfiguration(context.getConfiguration());
+
+        if (context.getConfiguration().get(BatchConstants.OUTPUT_PATH) != null) {
+            output = context.getConfiguration().get(BatchConstants.OUTPUT_PATH);
+        }
+
+        if (context.getConfiguration().get(BatchConstants.HFILE_SIZE_GB) != null) {
+            hfileSizeGB = Integer.valueOf(context.getConfiguration().get(BatchConstants.HFILE_SIZE_GB));
+        }
+
+        if (context.getConfiguration().get(BatchConstants.REGION_SPLIT_SIZE) != null) {
+            cut = Integer.valueOf(context.getConfiguration().get(BatchConstants.REGION_SPLIT_SIZE));
+        }
+
+        if (context.getConfiguration().get(BatchConstants.REGION_NUMBER_MIN) != null) {
+            minRegionCount = Integer.valueOf(context.getConfiguration().get(BatchConstants.REGION_NUMBER_MIN));
+        }
+
+        if (context.getConfiguration().get(BatchConstants.REGION_NUMBER_MAX) != null) {
+            maxRegionCount = Integer.valueOf(context.getConfiguration().get(BatchConstants.REGION_NUMBER_MAX));
+        }
+
+        logger.info("Chosen cut for htable is " + cut + ", max region count=" + maxRegionCount
+            + ", min region count=" + minRegionCount + ", hfile size=" + hfileSizeGB);
+
+        // add empty key at position 0
+        gbPoints.add(new Text());
+    }
+
+    @Override
+    public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
+        for (LongWritable v : values) {
+            bytesRead += v.get();
+        }
+
+        if (bytesRead >= ONE_GIGA_BYTES) {
+            gbPoints.add(new Text(key));
+            bytesRead = 0; // reset bytesRead
+        }
+    }
+
+    @Override
+    protected void cleanup(Context context) throws IOException, InterruptedException {
+        int nRegion = Math.round((float) gbPoints.size() / (float) cut);
+        nRegion = Math.max(minRegionCount, nRegion);
+        nRegion = Math.min(maxRegionCount, nRegion);
+
+        int gbPerRegion = gbPoints.size() / nRegion;
+        gbPerRegion = Math.max(1, gbPerRegion);
+
+        if (hfileSizeGB <= 0) {
+            hfileSizeGB = gbPerRegion;
+        }
+        int hfilePerRegion = gbPerRegion / hfileSizeGB;
+        hfilePerRegion = Math.max(1, hfilePerRegion);
+
+        System.out.println(nRegion + " regions");
+        System.out.println(gbPerRegion + " GB per region");
+        System.out.println(hfilePerRegion + " hfile per region");
+
+        Path hfilePartitionFile = new Path(output + "/part-r-00000_hfile");
+        try (SequenceFile.Writer hfilePartitionWriter = new SequenceFile.Writer(
+                hfilePartitionFile.getFileSystem(context.getConfiguration()),
+                context.getConfiguration(), hfilePartitionFile, ImmutableBytesWritable.class, NullWritable.class)) {
+            int hfileCountInOneRegion = 0;
+            for (int i = hfileSizeGB; i < gbPoints.size(); i += hfileSizeGB) {
+                hfilePartitionWriter.append(new ImmutableBytesWritable(gbPoints.get(i).getBytes()), NullWritable.get());
+                if (++hfileCountInOneRegion >= hfilePerRegion) {
+                    Text key = gbPoints.get(i);
+                    outputValue.set(i);
+                    System.out.println(StringUtils.byteToHexString(key.getBytes()) + "\t" + outputValue.get());
+                    context.write(key, outputValue);
+
+                    hfileCountInOneRegion = 0;
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/ab4d8909/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseRegionSizeCalculator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseRegionSizeCalculator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseRegionSizeCalculator.java
index ba0da00..346c3a2 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseRegionSizeCalculator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseRegionSizeCalculator.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.kylin.common.util.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,6 +50,8 @@ public class HBaseRegionSizeCalculator {
      **/
     private final Map<byte[], Long> sizeMap = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
 
+    private final Map<byte[], Pair<Integer, Integer>> countMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+
     static final String ENABLE_REGIONSIZECALCULATOR = "hbase.regionsizecalculator.enable";
 
     /**
@@ -93,6 +96,7 @@ public class HBaseRegionSizeCalculator {
 
                         long regionSizeBytes = regionLoad.getStorefileSizeMB() * megaByte;
                         sizeMap.put(regionId, regionSizeBytes);
+                        countMap.put(regionId, new Pair<>(regionLoad.getStores(), regionLoad.getStorefiles()));
 
                         // logger.info("Region " + regionLoad.getNameAsString()
                         // + " has size " + regionSizeBytes);
@@ -125,4 +129,8 @@ public class HBaseRegionSizeCalculator {
     public Map<byte[], Long> getRegionSizeMap() {
         return Collections.unmodifiableMap(sizeMap);
     }
+
+    public Map<byte[], Pair<Integer, Integer>> getRegionHFileCountMap() {
+        return Collections.unmodifiableMap(countMap);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/ab4d8909/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJobTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJobTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJobTest.java
index 7f5b24b..70e1ac7 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJobTest.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJobTest.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.engine.mr.steps.RangeKeyDistributionJob;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/kylin/blob/ab4d8909/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapperTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapperTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapperTest.java
index ca716c3..03a3cba 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapperTest.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapperTest.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mrunit.mapreduce.MapDriver;
 import org.apache.hadoop.mrunit.types.Pair;
-import org.apache.kylin.engine.mr.steps.RangeKeyDistributionMapper;
 import org.junit.Before;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/ab4d8909/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducerTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducerTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducerTest.java
index cbf0657..c027c40 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducerTest.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducerTest.java
@@ -24,7 +24,6 @@ import java.io.IOException;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
-import org.apache.kylin.engine.mr.steps.RangeKeyDistributionReducer;
 import org.junit.Before;
 import org.junit.Test;