You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/02/25 11:12:38 UTC

[1/2] kylin git commit: KYLIN-1323 give “kylin.hbase.hfile.size.gb” a default value

Repository: kylin
Updated Branches:
  refs/heads/1.x-staging 63c59fe9d -> aa8bdde4c


KYLIN-1323 give “kylin.hbase.hfile.size.gb” a default value


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/aa8bdde4
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/aa8bdde4
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/aa8bdde4

Branch: refs/heads/1.x-staging
Commit: aa8bdde4c540ab321ba6494b2a5f07d8fc186b31
Parents: 07ad877
Author: shaofengshi <sh...@apache.org>
Authored: Thu Feb 25 17:58:10 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Feb 25 18:01:14 2016 +0800

----------------------------------------------------------------------
 .../src/main/java/org/apache/kylin/common/KylinConfig.java  | 2 +-
 conf/kylin.properties                                       | 4 ++--
 .../kylin/job/hadoop/cube/RangeKeyDistributionReducer.java  | 9 ++++-----
 3 files changed, 7 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/aa8bdde4/common/src/main/java/org/apache/kylin/common/KylinConfig.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/KylinConfig.java b/common/src/main/java/org/apache/kylin/common/KylinConfig.java
index 9c813cd..c58d419 100644
--- a/common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -730,7 +730,7 @@ public class KylinConfig {
     }
 
     public int getHBaseHFileSizeGB() {
-        return Integer.parseInt(getOptional(HBASE_HFILE_SIZE_GB, "0"));
+        return Integer.parseInt(getOptional(HBASE_HFILE_SIZE_GB, "5"));
     }
 
     public int getHBaseRegionCountMin() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/aa8bdde4/conf/kylin.properties
----------------------------------------------------------------------
diff --git a/conf/kylin.properties b/conf/kylin.properties
index bbcf948..5da3d96 100644
--- a/conf/kylin.properties
+++ b/conf/kylin.properties
@@ -84,8 +84,8 @@ kylin.hbase.region.count.min=1
 kylin.hbase.region.count.max=500
 
 # The hfile size of GB, smaller hfile leading to the converting hfile MR has more reducers and be faster
-# set to 0 or comment this config to disable this optimization
-kylin.hbase.hfile.size.gb=0
+# set to 0 or comment this config to disable this optimization;
+kylin.hbase.hfile.size.gb=5
 
 # Enable/disable ACL check for cube query
 kylin.query.security.enabled=true

http://git-wip-us.apache.org/repos/asf/kylin/blob/aa8bdde4/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java
index f9c46bb..e51b4b3 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java
@@ -18,10 +18,6 @@
 
 package org.apache.kylin.job.hadoop.cube;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.io.LongWritable;
@@ -30,11 +26,14 @@ import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.kylin.common.mr.KylinReducer;
-import org.apache.kylin.cube.model.v1.CubeDesc.CubeCapacity;
 import org.apache.kylin.job.constant.BatchConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * @author ysong1
  * 


[2/2] kylin git commit: KYLIN-1323 Improve performance of converting data to hfile

Posted by sh...@apache.org.
KYLIN-1323 Improve performance of converting data to hfile


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/07ad8777
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/07ad8777
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/07ad8777

Branch: refs/heads/1.x-staging
Commit: 07ad8777eaa2788de48290a020590bab80f4d42c
Parents: 63c59fe
Author: sunyerui <su...@gmail.com>
Authored: Mon Feb 22 16:05:17 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Feb 25 18:01:14 2016 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/KylinConfig.java    | 10 ++++
 .../common/util/HBaseRegionSizeCalculator.java  |  8 ++++
 conf/kylin.properties                           |  4 ++
 .../kylin/job/constant/BatchConstants.java      |  1 +
 .../apache/kylin/job/cube/CubingJobBuilder.java |  3 +-
 .../kylin/job/hadoop/AbstractHadoopJob.java     |  2 +-
 .../kylin/job/hadoop/cube/CubeHFileJob.java     | 37 ++++++++++++++-
 .../hadoop/cube/RangeKeyDistributionJob.java    | 13 ++++++
 .../hadoop/cube/RangeKeyDistributionMapper.java |  9 +++-
 .../cube/RangeKeyDistributionReducer.java       | 49 +++++++++++++++++---
 .../kylin/job/BuildCubeWithEngineTest.java      | 42 +++++++++++++++++
 11 files changed, 166 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/07ad8777/common/src/main/java/org/apache/kylin/common/KylinConfig.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/KylinConfig.java b/common/src/main/java/org/apache/kylin/common/KylinConfig.java
index 7203065..9c813cd 100644
--- a/common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -131,6 +131,7 @@ public class KylinConfig {
 
     public static final String HTABLE_DEFAULT_COMPRESSION_CODEC = "kylin.hbase.default.compression.codec";
 
+    public static final String HBASE_HFILE_SIZE_GB = "kylin.hbase.hfile.size.gb";
     public static final String HBASE_REGION_CUT_SMALL = "kylin.hbase.region.cut.small";
     public static final String HBASE_REGION_CUT_MEDIUM = "kylin.hbase.region.cut.medium";
     public static final String HBASE_REGION_CUT_LARGE = "kylin.hbase.region.cut.large";
@@ -723,6 +724,15 @@ public class KylinConfig {
         kylinConfig.setProperty(KYLIN_JOB_REMOTE_CLI_PASSWORD, v);
     }
 
+    // for test
+    public void setHBaseHFileSizeGB(int size) {
+        kylinConfig.setProperty(HBASE_HFILE_SIZE_GB, String.valueOf(size));
+    }
+
+    public int getHBaseHFileSizeGB() {
+        return Integer.parseInt(getOptional(HBASE_HFILE_SIZE_GB, "0"));
+    }
+
     public int getHBaseRegionCountMin() {
         return Integer.parseInt(getOptional(HBASE_REGION_COUNT_MIN, "1"));
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/07ad8777/common/src/main/java/org/apache/kylin/common/util/HBaseRegionSizeCalculator.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/HBaseRegionSizeCalculator.java b/common/src/main/java/org/apache/kylin/common/util/HBaseRegionSizeCalculator.java
index 093ac9e..fccd042 100644
--- a/common/src/main/java/org/apache/kylin/common/util/HBaseRegionSizeCalculator.java
+++ b/common/src/main/java/org/apache/kylin/common/util/HBaseRegionSizeCalculator.java
@@ -48,6 +48,9 @@ public class HBaseRegionSizeCalculator {
      **/
     private final Map<byte[], Long> sizeMap = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
 
+    private final Map<byte[], Pair<Integer, Integer>> countMap =
+            new TreeMap<>(Bytes.BYTES_COMPARATOR);
+
     static final String ENABLE_REGIONSIZECALCULATOR = "hbase.regionsizecalculator.enable";
 
     /**
@@ -92,6 +95,7 @@ public class HBaseRegionSizeCalculator {
 
                         long regionSizeBytes = regionLoad.getStorefileSizeMB() * megaByte;
                         sizeMap.put(regionId, regionSizeBytes);
+                        countMap.put(regionId, new Pair<>(regionLoad.getStores(), regionLoad.getStorefiles()));
 
                         // logger.info("Region " + regionLoad.getNameAsString()
                         // + " has size " + regionSizeBytes);
@@ -124,4 +128,8 @@ public class HBaseRegionSizeCalculator {
     public Map<byte[], Long> getRegionSizeMap() {
         return Collections.unmodifiableMap(sizeMap);
     }
+
+    public Map<byte[], Pair<Integer, Integer>> getRegionHFileCountMap() {
+        return Collections.unmodifiableMap(countMap);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/07ad8777/conf/kylin.properties
----------------------------------------------------------------------
diff --git a/conf/kylin.properties b/conf/kylin.properties
index e0727ed..bbcf948 100644
--- a/conf/kylin.properties
+++ b/conf/kylin.properties
@@ -83,6 +83,10 @@ kylin.hbase.region.cut.large=100
 kylin.hbase.region.count.min=1
 kylin.hbase.region.count.max=500
 
+# The hfile size of GB, smaller hfile leading to the converting hfile MR has more reducers and be faster
+# set to 0 or comment this config to disable this optimization
+kylin.hbase.hfile.size.gb=0
+
 # Enable/disable ACL check for cube query
 kylin.query.security.enabled=true
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/07ad8777/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java b/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java
index 38f4a87..a3c4c32 100644
--- a/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java
+++ b/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java
@@ -47,6 +47,7 @@ public interface BatchConstants {
     String REGION_NUMBER_MIN = "region.number.min";
     String REGION_NUMBER_MAX = "region.number.max";
     String REGION_SPLIT_SIZE = "region.split.size";
+    String HFILE_SIZE_GB = "hfile.size.gb";
     String CUBE_CAPACITY = "cube.capacity";
 
     String CFG_KYLIN_LOCAL_TEMP_DIR = "/tmp/kylin/";

http://git-wip-us.apache.org/repos/asf/kylin/blob/07ad8777/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
index 80c030f..14bb649 100644
--- a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
@@ -372,7 +372,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
         createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE);
         StringBuilder cmd = new StringBuilder();
         appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, "input", getRowkeyDistributionOutputPath(seg, jobId) + "/part-r-00000");
+        appendExecCmdParameters(cmd, "partitions", getRowkeyDistributionOutputPath(seg, jobId) + "/part-r-00000");
         appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
 
         createHtableStep.setJobParams(cmd.toString());
@@ -388,6 +388,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
 
         appendMapReduceParameters(cmd, seg);
         appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+        appendExecCmdParameters(cmd, "partitions", getRowkeyDistributionOutputPath(seg, jobId) + "/part-r-00000_hfile");
         appendExecCmdParameters(cmd, "input", inputPath);
         appendExecCmdParameters(cmd, "output", getHFilePath(seg, jobId));
         appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());

http://git-wip-us.apache.org/repos/asf/kylin/blob/07ad8777/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
index 698a978..a7d107d 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
@@ -83,7 +83,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
     protected static final Option OPTION_INPUT_DELIM = OptionBuilder.withArgName("inputdelim").hasArg().isRequired(false).withDescription("Input delimeter").create("inputdelim");
     protected static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Output path").create("output");
     protected static final Option OPTION_NCUBOID_LEVEL = OptionBuilder.withArgName("level").hasArg().isRequired(true).withDescription("N-Cuboid build level, e.g. 1, 2, 3...").create("level");
-    protected static final Option OPTION_PARTITION_FILE_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Partition file path.").create("input");
+    protected static final Option OPTION_PARTITION_FILE_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Partition file path.").create("partitions");
     protected static final Option OPTION_HTABLE_NAME = OptionBuilder.withArgName("htable name").hasArg().isRequired(true).withDescription("HTable name").create("htablename");
 
     protected String name;

http://git-wip-us.apache.org/repos/asf/kylin/blob/07ad8777/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java
index 3c1e4a5..d5b83ef 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java
@@ -18,17 +18,24 @@
 
 package org.apache.kylin.job.hadoop.cube;
 
+import java.io.IOException;
+
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
 import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeInstance;
@@ -51,11 +58,14 @@ public class CubeHFileJob extends AbstractHadoopJob {
         try {
             options.addOption(OPTION_JOB_NAME);
             options.addOption(OPTION_CUBE_NAME);
+            options.addOption(OPTION_PARTITION_FILE_PATH);
             options.addOption(OPTION_INPUT_PATH);
             options.addOption(OPTION_OUTPUT_PATH);
             options.addOption(OPTION_HTABLE_NAME);
             parseOptions(options, args);
 
+            Path partitionFilePath = new Path(getOptionValue(OPTION_PARTITION_FILE_PATH));
+
             Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
             String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
 
@@ -82,8 +92,9 @@ public class CubeHFileJob extends AbstractHadoopJob {
             String tableName = getOptionValue(OPTION_HTABLE_NAME).toUpperCase();
             HTable htable = new HTable(conf, tableName);
 
-            //Automatic config !
+            // Automatic config !
             HFileOutputFormat.configureIncrementalLoad(job, htable);
+            reconfigurePartitions(conf, partitionFilePath);
 
             // set block replication to 3 for hfiles
             conf.set(DFSConfigKeys.DFS_REPLICATION_KEY, "3");
@@ -101,6 +112,30 @@ public class CubeHFileJob extends AbstractHadoopJob {
         }
     }
 
+    /**
+     * Check if there's partition files for hfile, if yes replace the table splits, to make the job more reducers
+     * @param conf the job configuration
+     * @param path the hfile partition file
+     * @throws IOException
+     */
+    @SuppressWarnings("deprecation")
+    private void reconfigurePartitions(Configuration conf, Path path) throws IOException {
+        FileSystem fs = path.getFileSystem(conf);
+        if (fs.exists(path)) {
+            try (SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf)) {
+                int partitionCount = 0;
+                Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+                Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+                while (reader.next(key, value)) {
+                    partitionCount++;
+                }
+                TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), path);
+                // The reduce tasks should be one more than partition keys
+                job.setNumReduceTasks(partitionCount+1);
+            }
+        }
+    }
+
     public static void main(String[] args) throws Exception {
         int exitCode = ToolRunner.run(new CubeHFileJob(), args);
         System.exit(exitCode);

http://git-wip-us.apache.org/repos/asf/kylin/blob/07ad8777/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJob.java
index 9c50122..a7832fa 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJob.java
@@ -20,7 +20,10 @@ package org.apache.kylin.job.hadoop.cube;
 
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
@@ -92,13 +95,23 @@ public class RangeKeyDistributionJob extends AbstractHadoopJob {
             String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
             CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
             CubeInstance cube = cubeMgr.getCube(cubeName);
+            int hfileSizeGB = KylinConfig.getInstanceFromEnv().getHBaseHFileSizeGB();
             DataModelDesc.RealizationCapacity cubeCapacity = cube.getDescriptor().getModel().getCapacity();
             int regionSplitSize = KylinConfig.getInstanceFromEnv().getHBaseRegionCut(cubeCapacity.toString());
             int maxRegionCount = KylinConfig.getInstanceFromEnv().getHBaseRegionCountMax();
             int minRegionCount = KylinConfig.getInstanceFromEnv().getHBaseRegionCountMin();
+            job.getConfiguration().set(BatchConstants.OUTPUT_PATH, output.toString());
+            job.getConfiguration().set(BatchConstants.HFILE_SIZE_GB, String.valueOf(hfileSizeGB));
             job.getConfiguration().set(BatchConstants.REGION_SPLIT_SIZE, String.valueOf(regionSplitSize));
             job.getConfiguration().set(BatchConstants.REGION_NUMBER_MAX, String.valueOf(maxRegionCount));
             job.getConfiguration().set(BatchConstants.REGION_NUMBER_MIN, String.valueOf(minRegionCount));
+            // The partition file for hfile is sequenece file consists of ImmutableBytesWritable and NullWritable
+            TableMapReduceUtil.addDependencyJars(job.getConfiguration(), ImmutableBytesWritable.class, NullWritable.class);
+
+            // Passed the sandbox property to mapper, to simulate large dataset
+            if (System.getProperty("useSandbox") != null && System.getProperty("useSandbox").equals("true")) {
+                job.getConfiguration().setBoolean("useSandbox", true);
+            }
 
             return waitForCompletion(job);
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/07ad8777/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionMapper.java
index 33baf45..41856b4 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionMapper.java
@@ -38,9 +38,14 @@ public class RangeKeyDistributionMapper extends KylinMapper<Text, Text, Text, Lo
 
     private Text lastKey;
 
+    private Long scaleFactorForSandbox = 1L;
+
     @Override
     protected void setup(Context context) throws IOException {
         super.publishConfiguration(context.getConfiguration());
+        if (context.getConfiguration().getBoolean("useSandbox", false)) {
+            scaleFactorForSandbox = 1024L;
+        }
     }
 
     @Override
@@ -50,8 +55,8 @@ public class RangeKeyDistributionMapper extends KylinMapper<Text, Text, Text, Lo
         int bytesLength = key.getLength() + value.getLength();
         bytesRead += bytesLength;
 
-        if (bytesRead >= ONE_MEGA_BYTES) {
-            outputValue.set(bytesRead);
+        if ((bytesRead * scaleFactorForSandbox) >= ONE_MEGA_BYTES) {
+            outputValue.set(bytesRead * scaleFactorForSandbox);
             context.write(key, outputValue);
 
             // reset bytesRead

http://git-wip-us.apache.org/repos/asf/kylin/blob/07ad8777/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java
index b3ab4db..f9c46bb 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java
@@ -22,7 +22,11 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.kylin.common.mr.KylinReducer;
@@ -46,13 +50,23 @@ public class RangeKeyDistributionReducer extends KylinReducer<Text, LongWritable
     private int minRegionCount = 1;
     private int maxRegionCount = 500;
     private int cut = 10;
+    private int hfileSizeGB = 1;
     private long bytesRead = 0;
     private List<Text> gbPoints = new ArrayList<Text>();
+    private String output = null;
 
     @Override
     protected void setup(Context context) throws IOException {
         super.publishConfiguration(context.getConfiguration());
 
+        if (context.getConfiguration().get(BatchConstants.OUTPUT_PATH) != null) {
+            output = context.getConfiguration().get(BatchConstants.OUTPUT_PATH);
+        }
+
+        if (context.getConfiguration().get(BatchConstants.HFILE_SIZE_GB) != null) {
+            hfileSizeGB = Integer.valueOf(context.getConfiguration().get(BatchConstants.HFILE_SIZE_GB));
+        }
+
         if (context.getConfiguration().get(BatchConstants.REGION_SPLIT_SIZE) != null) {
             cut = Integer.valueOf(context.getConfiguration().get(BatchConstants.REGION_SPLIT_SIZE));
         }
@@ -65,7 +79,11 @@ public class RangeKeyDistributionReducer extends KylinReducer<Text, LongWritable
             maxRegionCount = Integer.valueOf(context.getConfiguration().get(BatchConstants.REGION_NUMBER_MAX));
         }
 
-        logger.info("Chosen cut for htable is " + cut + ", max region count=" + maxRegionCount + ", min region count =" + minRegionCount);
+        logger.info("Chosen cut for htable is " + cut + ", max region count=" + maxRegionCount
+                + ", min region count=" + minRegionCount + ", hfile size=" + hfileSizeGB);
+
+        // add empty key at position 0
+        gbPoints.add(new Text());
     }
 
     @Override
@@ -89,14 +107,31 @@ public class RangeKeyDistributionReducer extends KylinReducer<Text, LongWritable
         int gbPerRegion = gbPoints.size() / nRegion;
         gbPerRegion = Math.max(1, gbPerRegion);
 
+        if (hfileSizeGB <= 0) {
+            hfileSizeGB = gbPerRegion;
+        }
+        int hfilePerRegion = gbPerRegion / hfileSizeGB;
+        hfilePerRegion = Math.max(1, hfilePerRegion);
         System.out.println(nRegion + " regions");
         System.out.println(gbPerRegion + " GB per region");
-
-        for (int i = gbPerRegion; i < gbPoints.size(); i += gbPerRegion) {
-            Text key = gbPoints.get(i);
-            outputValue.set(i);
-            System.out.println(StringUtils.byteToHexString(key.getBytes()) + "\t" + outputValue.get());
-            context.write(key, outputValue);
+        System.out.println(hfilePerRegion + " hfile per region");
+
+        Path hfilePartitionFile = new Path(output + "/part-r-00000_hfile");
+        try (SequenceFile.Writer hfilePartitionWriter = new SequenceFile.Writer(
+                hfilePartitionFile.getFileSystem(context.getConfiguration()),
+                context.getConfiguration(), hfilePartitionFile, ImmutableBytesWritable.class, NullWritable.class)) {
+            int hfileCountInOneRegion = 0;
+            for (int i = hfileSizeGB; i < gbPoints.size(); i += hfileSizeGB) {
+                hfilePartitionWriter.append(new ImmutableBytesWritable(gbPoints.get(i).getBytes()), NullWritable.get());
+                if (++hfileCountInOneRegion >= hfilePerRegion) {
+                    Text key = gbPoints.get(i);
+                    outputValue.set(i);
+                    System.out.println(StringUtils.byteToHexString(key.getBytes()) + "\t" + outputValue.get());
+                    context.write(key, outputValue);
+
+                    hfileCountInOneRegion = 0;
+                }
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/07ad8777/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java b/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
index f02aa7a..78e6ab3 100644
--- a/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
+++ b/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
@@ -21,9 +21,11 @@ package org.apache.kylin.job;
 import static org.junit.Assert.assertEquals;
 
 import java.io.File;
+import java.io.IOException;
 import java.lang.reflect.Method;
 import java.text.SimpleDateFormat;
 import java.util.List;
+import java.util.Map;
 import java.util.TimeZone;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
@@ -33,11 +35,16 @@ import java.util.concurrent.Future;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HTable;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.lock.ZookeeperJobLock;
 import org.apache.kylin.common.util.AbstractKylinTestCase;
 import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.common.util.HBaseRegionSizeCalculator;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
@@ -129,9 +136,11 @@ public class BuildCubeWithEngineTest {
     }
 
     private void testInner() throws Exception {
+        KylinConfig.getInstanceFromEnv().setHBaseHFileSizeGB(1);
         DeployUtil.prepareTestData("inner", "test_kylin_cube_with_slr_empty");
         String[] testCase = new String[] { "testInnerJoinCube", "testInnerJoinCube2", };
         runTestAndAssertSucceed(testCase);
+        KylinConfig.getInstanceFromEnv().setHBaseHFileSizeGB(0);
     }
 
     private void testLeft() throws Exception {
@@ -280,6 +289,7 @@ public class BuildCubeWithEngineTest {
         CubingJob job = cubingJobBuilder.buildJob(segment);
         jobService.addJob(job);
         waitForJob(job.getId());
+        checkHFilesInHBase(segment);
         return job.getId();
     }
 
@@ -295,4 +305,36 @@ public class BuildCubeWithEngineTest {
 
         return jobId;
     }
+
+    private void checkHFilesInHBase(CubeSegment segment) throws IOException {
+        Configuration conf = HadoopUtil.getCurrentHBaseConfiguration();
+        String tableName = segment.getStorageLocationIdentifier();
+        HTable table = new HTable(conf, tableName);
+        HBaseRegionSizeCalculator cal = new HBaseRegionSizeCalculator(table);
+        Map<byte[], Long> sizeMap = cal.getRegionSizeMap();
+        long totalSize = 0;
+        for (Long size : sizeMap.values()) {
+            totalSize += size;
+        }
+        if (totalSize == 0) {
+            return;
+        }
+        Map<byte[], Pair<Integer, Integer>> countMap = cal.getRegionHFileCountMap();
+        // check if there's region contains more than one hfile, which means the hfile config take effects
+        boolean hasMultiHFileRegions = false;
+        for (Pair<Integer, Integer> count : countMap.values()) {
+            // check if hfile count is greater than store count
+            if (count.getSecond() > count.getFirst()) {
+                hasMultiHFileRegions = true;
+                break;
+            }
+        }
+        if (KylinConfig.getInstanceFromEnv().getHBaseHFileSizeGB() == 0 && hasMultiHFileRegions) {
+            throw new IOException("hfile size set to 0, but found region contains more than one hfiles");
+        } else if (KylinConfig.getInstanceFromEnv().getHBaseHFileSizeGB() > 0 && !hasMultiHFileRegions) {
+            throw new IOException("hfile size set greater than 0, but all regions still has only one hfile");
+        }
+    }
+
 }
+