You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/05/15 05:06:42 UTC

[30/50] [abbrv] incubator-kylin git commit: KYLIN-750 Merge cube segments from HBase table

KYLIN-750 Merge cube segments from HBase table


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/e008e2cc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/e008e2cc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/e008e2cc

Branch: refs/heads/streaming-localdict
Commit: e008e2ccdba67a0b4d0ef065d4d0a368ff09c1e7
Parents: 876ac60
Author: Shao Feng, Shi <sh...@ebay.com>
Authored: Tue May 12 10:26:04 2015 +0800
Committer: Shao Feng, Shi <sh...@ebay.com>
Committed: Tue May 12 10:26:43 2015 +0800

----------------------------------------------------------------------
 .../kylin/common/persistence/ResourceStore.java |   2 +
 .../kylin/job/constant/ExecutableConstants.java |   1 +
 .../apache/kylin/job/cube/CubingJobBuilder.java |  68 +++++--
 .../cardinality/ColumnCardinalityMapper.java    |   2 +-
 .../job/hadoop/cube/FactDistinctColumnsJob.java |  45 ++++-
 .../hadoop/cube/FactDistinctColumnsReducer.java |  67 ++++---
 .../hadoop/cubev2/MergeCuboidFromHBaseJob.java  |  22 ++-
 .../cubev2/MergeCuboidFromHBaseMapper.java      |   5 +-
 .../job/hadoop/cubev2/MergeStatisticsStep.java  | 181 +++++++++++++++++++
 .../kylin/job/hadoop/hbase/CreateHTableJob.java |  54 ++++--
 10 files changed, 368 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e008e2cc/common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java b/common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
index 96153fd..a99ca5d 100644
--- a/common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
+++ b/common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
@@ -53,6 +53,8 @@ abstract public class ResourceStore {
     public static final String HYBRID_RESOURCE_ROOT = "/hybrid";
     public static final String STREAMING_RESOURCE_ROOT = "/streaming";
     public static final String STREAMING_OUTPUT_RESOURCE_ROOT = "/streaming_output";
+    public static final String CUBE_STATISTICS_ROOT = "/cube_statistics";
+
 
     private static ConcurrentHashMap<KylinConfig, ResourceStore> CACHE = new ConcurrentHashMap<KylinConfig, ResourceStore>();
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e008e2cc/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index 50e9c3a..2e5d97a 100644
--- a/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -54,6 +54,7 @@ public final class ExecutableConstants {
     public static final String STEP_NAME_CONVERT_CUBOID_TO_HFILE = "Convert Cuboid Data to HFile";
     public static final String STEP_NAME_BULK_LOAD_HFILE = "Load HFile to HBase Table";
     public static final String STEP_NAME_MERGE_DICTIONARY = "Merge Cuboid Dictionary";
+    public static final String STEP_NAME_MERGE_STATISTICS = "Merge Cuboid Statistics";
     public static final String STEP_NAME_MERGE_CUBOID = "Merge Cuboid Data";
     public static final String STEP_NAME_UPDATE_CUBE_INFO = "Update Cube Info";
     

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e008e2cc/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
index a01f462..6593ed2 100644
--- a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
@@ -33,6 +33,7 @@ import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.hadoop.cube.*;
 import org.apache.kylin.job.hadoop.cubev2.InMemCuboidJob;
 import org.apache.kylin.job.hadoop.cubev2.MergeCuboidFromHBaseJob;
+import org.apache.kylin.job.hadoop.cubev2.MergeStatisticsStep;
 import org.apache.kylin.job.hadoop.dict.CreateDictionaryJob;
 import org.apache.kylin.job.hadoop.hbase.BulkLoadJob;
 import org.apache.kylin.job.hadoop.hbase.CreateHTableJob;
@@ -112,18 +113,21 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
                 mergingCuboidPaths.add(getPathToMerge(merging));
         }
 
-        if(this.inMemoryCubing()) {
+        AbstractExecutable convertCuboidToHfileStep;
+
+        if (this.inMemoryCubing()) {
             // merge from HTable
-            addMergeFromHBaseSteps(mergeSegment, mergingSegmentIds, mergingHTables, mergedRootPath, result);
+            convertCuboidToHfileStep = addMergeFromHBaseSteps(mergeSegment, mergingSegmentIds, mergingHTables, mergedRootPath, result);
+
+            // bulk load step
+            result.addTask(createBulkLoadStep(mergeSegment, result.getId()));
         } else {
             // merge cuboid
             addMergeSteps(mergeSegment, mergingSegmentIds, mergingCuboidPaths, mergedRootPath, result);
+            // convert htable
+            convertCuboidToHfileStep = addHTableSteps(mergeSegment, mergedRootPath, result);
         }
 
-
-        // convert htable
-        AbstractExecutable convertCuboidToHfileStep = addHTableSteps(mergeSegment, mergedRootPath, result);
-
         // update cube info
         result.addTask(createUpdateCubeInfoAfterMergeStep(mergeSegment, mergingSegmentIds, convertCuboidToHfileStep.getId(), jobId));
 
@@ -148,15 +152,20 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
             mergingHTables.add(merging.getStorageLocationIdentifier());
         }
 
-        if(this.inMemoryCubing()) {
+
+        AbstractExecutable convertCuboidToHfileStep;
+        if (this.inMemoryCubing()) {
             // merge from HTable
-            addMergeFromHBaseSteps(seg, mergingSegmentIds, mergingHTables, mergedCuboidPath, result);
+            convertCuboidToHfileStep = addMergeFromHBaseSteps(seg, mergingSegmentIds, mergingHTables, mergedCuboidPath, result);
+
+            // bulk load step
+            result.addTask(createBulkLoadStep(seg, result.getId()));
         } else {
             // merge cuboid
             addMergeSteps(seg, mergingSegmentIds, mergingCuboidPaths, mergedCuboidPath, result);
+            // convert htable
+            convertCuboidToHfileStep = addHTableSteps(seg, mergedCuboidPath, result);
         }
-        // convert htable
-        AbstractExecutable convertCuboidToHfileStep = addHTableSteps(seg, mergedCuboidPath, result);
 
         // update cube info
         result.addTask(createUpdateCubeInfoAfterMergeStep(seg, mergingSegmentIds, convertCuboidToHfileStep.getId(), jobId));
@@ -173,12 +182,22 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
     }
 
 
-    void addMergeFromHBaseSteps(CubeSegment seg, List<String> mergingSegmentIds, List<String> mergingHTables, String mergedCuboidPath, CubingJob result) {
+    AbstractExecutable addMergeFromHBaseSteps(CubeSegment seg, List<String> mergingSegmentIds, List<String> mergingHTables, String mergedCuboidPath, CubingJob result) {
 
         result.addTask(createMergeDictionaryStep(seg, mergingSegmentIds));
 
+        String mergedStatisticsFolder = getStatisticsPath(seg, result.getId());
+        result.addTask(createMergeStatisticsStep(seg, mergingSegmentIds, mergedStatisticsFolder));
+
+        // create htable step
+        result.addTask(createCreateHTableStep(seg));
+
         String formattedTables = StringUtils.join(mergingHTables, ",");
-        result.addTask(createMergeCuboidDataFromHBaseStep(seg, formattedTables, mergedCuboidPath));
+        String hFilePath = getHFilePath(seg, result.getId());
+        MapReduceExecutable writeHFileStep = createMergeCuboidDataFromHBaseStep(seg, formattedTables, hFilePath);
+        result.addTask(writeHFileStep);
+
+        return writeHFileStep;
     }
 
     Pair<AbstractExecutable, AbstractExecutable> addCubingSteps(CubeSegment seg, String cuboidRootPath, CubingJob result) {
@@ -209,7 +228,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
             }
         } else {
             // create htable step
-            result.addTask(createCreateHTableStep(seg, jobId));
+            result.addTask(createCreateHTableStep(seg));
             baseCuboidStep = createInMemCubingStep(seg, intermediateHiveTableLocation, intermediateHiveTableName, cuboidOutputTempPath, result.getId());
             result.addTask(baseCuboidStep);
             // bulk load step
@@ -225,7 +244,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
 
         result.addTask(createRangeRowkeyDistributionStep(seg, cuboidPath));
         // create htable step
-        result.addTask(createCreateHTableStep(seg, jobId));
+        result.addTask(createCreateHTableStep(seg));
         // generate hfiles step
         final MapReduceExecutable convertCuboidToHfileStep = createConvertCuboidToHfileStep(seg, cuboidPath, jobId);
         result.addTask(convertCuboidToHfileStep);
@@ -415,7 +434,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
         return rowkeyDistributionStep;
     }
 
-    private HadoopShellExecutable createCreateHTableStep(CubeSegment seg, String jobId) {
+    private HadoopShellExecutable createCreateHTableStep(CubeSegment seg) {
         HadoopShellExecutable createHtableStep = new HadoopShellExecutable();
         createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE);
         StringBuilder cmd = new StringBuilder();
@@ -424,7 +443,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
         appendExecCmdParameters(cmd, "input", getRowkeyDistributionOutputPath(seg) + "/part-r-00000");
         appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
         appendExecCmdParameters(cmd, "statisticsenabled", String.valueOf(inMemoryCubing()));
-        appendExecCmdParameters(cmd, "statisticsoutput", getStatisticsPath(seg, jobId));
+        appendExecCmdParameters(cmd, "statisticssamplingpercent", String.valueOf(engineConfig.getConfig().getCubingInMemSamplingPercent()));
 
         createHtableStep.setJobParams(cmd.toString());
         createHtableStep.setJobClass(CreateHTableJob.class);
@@ -487,6 +506,18 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
         return result;
     }
 
+
+    private MergeStatisticsStep createMergeStatisticsStep(CubeSegment seg, List<String> mergingSegmentIds, String mergedStatisticsFolder) {
+        MergeStatisticsStep result = new MergeStatisticsStep();
+        result.setName(ExecutableConstants.STEP_NAME_MERGE_STATISTICS);
+        result.setCubeName(seg.getCubeInstance().getName());
+        result.setSegmentId(seg.getUuid());
+        result.setMergingSegmentIds(mergingSegmentIds);
+        result.setMergedStatisticsPath(mergedStatisticsFolder);
+        return result;
+    }
+
+
     private MapReduceExecutable createMergeCuboidDataStep(CubeSegment seg, String inputPath, String outputPath) {
         MapReduceExecutable mergeCuboidDataStep = new MapReduceExecutable();
         mergeCuboidDataStep.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID);
@@ -505,7 +536,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
     }
 
 
-    private MapReduceExecutable createMergeCuboidDataFromHBaseStep(CubeSegment seg, String inputPath, String outputPath) {
+    private MapReduceExecutable createMergeCuboidDataFromHBaseStep(CubeSegment seg, String inputPath, String hFilePath) {
         MapReduceExecutable mergeCuboidDataStep = new MapReduceExecutable();
         mergeCuboidDataStep.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID);
         StringBuilder cmd = new StringBuilder();
@@ -514,7 +545,8 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
         appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
         appendExecCmdParameters(cmd, "segmentname", seg.getName());
         appendExecCmdParameters(cmd, "input", inputPath);
-        appendExecCmdParameters(cmd, "output", outputPath);
+        appendExecCmdParameters(cmd, "output", hFilePath);
+        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
         appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
 
         mergeCuboidDataStep.setMapReduceParams(cmd.toString());

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e008e2cc/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java
index 948f25e..b32baf9 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java
@@ -89,10 +89,10 @@ public class ColumnCardinalityMapper<T> extends KylinMapper<T, HCatRecord, IntWr
     @Override
     protected void cleanup(Context context) throws IOException, InterruptedException {
         Iterator<Integer> it = hllcMap.keySet().iterator();
+        ByteBuffer buf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
         while (it.hasNext()) {
             int key = it.next();
             HyperLogLogPlusCounter hllc = hllcMap.get(key);
-            ByteBuffer buf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
             buf.clear();
             hllc.writeRegisters(buf);
             buf.flip();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e008e2cc/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
index 9e4b363..bc10032 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
@@ -18,10 +18,12 @@
 
 package org.apache.kylin.job.hadoop.cube;
 
-import java.io.IOException;
-
 import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
@@ -31,14 +33,19 @@ import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.job.constant.BatchConstants;
 import org.apache.kylin.job.hadoop.AbstractHadoopJob;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+
 /**
  * @author yangli9
  */
@@ -63,7 +70,8 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
             job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
             String cubeName = getOptionValue(OPTION_CUBE_NAME);
             Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
-            String intermediateTable = getOptionValue(OPTION_TABLE_NAME);;
+            String intermediateTable = getOptionValue(OPTION_TABLE_NAME);
+            ;
             String segmentName = getOptionValue(OPTION_SEGMENT_NAME);
             String statistics_enabled = getOptionValue(OPTION_STATISTICS_ENABLED);
             String statistics_output = getOptionValue(OPTION_STATISTICS_OUTPUT);
@@ -73,6 +81,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
             // add metadata to distributed cache
             CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
             CubeInstance cubeInstance = cubeMgr.getCube(cubeName);
+            CubeSegment newSegment = cubeInstance.getSegment(segmentName, SegmentStatusEnum.NEW);
 
             job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
             job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
@@ -82,14 +91,21 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
             log.info("Starting: " + job.getJobName());
 
             setJobClasspath(job);
-            
+
             setupMapper(intermediateTable);
             setupReducer(output);
 
             // CubeSegment seg = cubeMgr.getCube(cubeName).getTheOnlySegment();
             attachKylinPropsAndMetadata(cubeInstance, job.getConfiguration());
 
-            return waitForCompletion(job);
+            int result = waitForCompletion(job);
+
+            if(Boolean.parseBoolean(statistics_enabled)) {
+                putStatisticsToResourceStore(statistics_output, newSegment);
+            }
+
+            return result;
+
 
         } catch (Exception e) {
             logger.error("error in FactDistinctColumnsJob", e);
@@ -105,7 +121,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
         String[] dbTableNames = HadoopUtil.parseHiveTableName(intermediateTable);
         HCatInputFormat.setInput(job, dbTableNames[0],
                 dbTableNames[1]);
-        
+
         job.setInputFormatClass(HCatInputFormat.class);
         job.setMapperClass(FactDistinctHiveColumnsMapper.class);
         job.setCombinerClass(FactDistinctColumnsCombiner.class);
@@ -127,6 +143,23 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
         deletePath(job.getConfiguration(), output);
     }
 
+    private void putStatisticsToResourceStore(String statisticsFolder, CubeSegment cubeSegment) throws IOException {
+        Path statisticsFilePath = new Path(statisticsFolder, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION);
+        FileSystem fs = HadoopUtil.getFileSystem("hdfs:///" + statisticsFilePath);
+        if (!fs.exists(statisticsFilePath))
+            throw new IOException("File " + statisticsFilePath + " does not exists;");
+
+        FSDataInputStream is = fs.open(statisticsFilePath);
+        try {
+            // put the statistics to metadata store
+            String statisticsFileName = ResourceStore.CUBE_STATISTICS_ROOT + "/" + cubeSegment.getCubeInstance().getName() + "/" + cubeSegment.getUuid() + ".seq";
+            ResourceStore rs = ResourceStore.getStore(KylinConfig.getInstanceFromEnv());
+            rs.putResource(statisticsFileName, is, System.currentTimeMillis());
+        } finally {
+            IOUtils.closeStream(is);
+        }
+    }
+
     public static void main(String[] args) throws Exception {
         FactDistinctColumnsJob job = new FactDistinctColumnsJob();
         int exitCode = ToolRunner.run(job, args);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e008e2cc/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
index c671da9..5d29e1b 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
@@ -24,24 +24,23 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.*;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
 import org.apache.kylin.common.mr.KylinReducer;
 import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.job.constant.BatchConstants;
 import org.apache.kylin.job.hadoop.AbstractHadoopJob;
 import org.apache.kylin.metadata.model.TblColRef;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.*;
 
 /**
@@ -53,7 +52,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text,
     private boolean collectStatistics = false;
     private String statisticsOutput = null;
     private List<Long> baseCuboidRowCountInMappers;
-    private Map<Long, Long> rowCountInCuboids;
+    //    private Map<Long, Long> rowCountInCuboids;
     protected Map<Long, HyperLogLogPlusCounter> cuboidHLLMap = null;
     protected long baseCuboidId;
     protected CubeDesc cubeDesc;
@@ -78,7 +77,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text,
 
         if (collectStatistics) {
             baseCuboidRowCountInMappers = Lists.newArrayList();
-            rowCountInCuboids = Maps.newHashMap();
+//            rowCountInCuboids = Maps.newHashMap();
             cuboidHLLMap = Maps.newHashMap();
             SAMPING_PERCENTAGE = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, "5"));
         }
@@ -113,23 +112,23 @@ public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text,
             // for hll
             long cuboidId = 0 - key.get();
 
-                for (Text value : values) {
-                    HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter(16);
-                    ByteArray byteArray = new ByteArray(value.getBytes());
-                    hll.readRegisters(byteArray.asBuffer());
+            for (Text value : values) {
+                HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter(16);
+                ByteArray byteArray = new ByteArray(value.getBytes());
+                hll.readRegisters(byteArray.asBuffer());
 
-                    totalRowsBeforeMerge += hll.getCountEstimate();
+                totalRowsBeforeMerge += hll.getCountEstimate();
 
-                    if (cuboidId == baseCuboidId) {
-                        baseCuboidRowCountInMappers.add(hll.getCountEstimate());
-                    }
+                if (cuboidId == baseCuboidId) {
+                    baseCuboidRowCountInMappers.add(hll.getCountEstimate());
+                }
 
-                    if (cuboidHLLMap.get(cuboidId) != null) {
-                        cuboidHLLMap.get(cuboidId).merge(hll);
-                    } else {
-                        cuboidHLLMap.put(cuboidId, hll);
-                    }
+                if (cuboidHLLMap.get(cuboidId) != null) {
+                    cuboidHLLMap.get(cuboidId).merge(hll);
+                } else {
+                    cuboidHLLMap.put(cuboidId, hll);
                 }
+            }
         }
 
     }
@@ -139,12 +138,8 @@ public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text,
 
         //output the hll info;
         if (collectStatistics) {
-            for (Long cuboidId : cuboidHLLMap.keySet()) {
-                rowCountInCuboids.put(cuboidId, cuboidHLLMap.get(cuboidId).getCountEstimate());
-            }
-
             writeMapperAndCuboidStatistics(context); // for human check
-            writeCuboidStatistics(context); // for CreateHTableJob
+            writeCuboidStatistics(context.getConfiguration(), statisticsOutput, cuboidHLLMap); // for CreateHTableJob
         }
     }
 
@@ -157,7 +152,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text,
             String msg;
 
             List<Long> allCuboids = new ArrayList<Long>();
-            allCuboids.addAll(rowCountInCuboids.keySet());
+            allCuboids.addAll(cuboidHLLMap.keySet());
             Collections.sort(allCuboids);
 
             msg = "Total cuboid number: \t" + allCuboids.size();
@@ -175,8 +170,8 @@ public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text,
 
             long grantTotal = 0;
             for (long i : allCuboids) {
-                grantTotal += rowCountInCuboids.get(i);
-                msg = "Cuboid " + i + " row count is: \t " + rowCountInCuboids.get(i);
+                grantTotal += cuboidHLLMap.get(i).getCountEstimate();
+                msg = "Cuboid " + i + " row count is: \t " + cuboidHLLMap.get(i).getCountEstimate();
                 writeLine(out, msg);
             }
 
@@ -200,19 +195,23 @@ public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text,
 
     }
 
-    private void writeCuboidStatistics(Context context) throws IOException {
-        Configuration conf = context.getConfiguration();
-        Path seqFilePath = new Path(statisticsOutput, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION);
+    public static void writeCuboidStatistics(Configuration conf, String outputPath, Map<Long, HyperLogLogPlusCounter> cuboidHLLMap) throws IOException {
+        Path seqFilePath = new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION);
         SequenceFile.Writer writer = SequenceFile.createWriter(conf,
                 SequenceFile.Writer.file(seqFilePath), SequenceFile.Writer.keyClass(LongWritable.class),
-                SequenceFile.Writer.valueClass(LongWritable.class));
+                SequenceFile.Writer.valueClass(BytesWritable.class));
 
         List<Long> allCuboids = new ArrayList<Long>();
-        allCuboids.addAll(rowCountInCuboids.keySet());
+        allCuboids.addAll(cuboidHLLMap.keySet());
         Collections.sort(allCuboids);
+
+        ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
         try {
             for (long i : allCuboids) {
-                writer.append(new LongWritable(i), new LongWritable((long) (rowCountInCuboids.get(i) *100 / SAMPING_PERCENTAGE)));
+                valueBuf.clear();
+                cuboidHLLMap.get(i).writeRegisters(valueBuf);
+                valueBuf.flip();
+                writer.append(new LongWritable(i), new BytesWritable(valueBuf.array(), valueBuf.limit()));
             }
         } finally {
             writer.close();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e008e2cc/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseJob.java
index b49af1a..4f50279 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseJob.java
@@ -22,7 +22,11 @@ import org.apache.commons.cli.Options;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.hadoop.io.Text;
@@ -55,10 +59,12 @@ public class MergeCuboidFromHBaseJob extends CuboidJob {
             options.addOption(OPTION_SEGMENT_NAME);
             options.addOption(OPTION_INPUT_PATH);
             options.addOption(OPTION_OUTPUT_PATH);
+            options.addOption(OPTION_HTABLE_NAME);
             parseOptions(options, args);
 
             String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
             String segmentName = getOptionValue(OPTION_SEGMENT_NAME).toUpperCase();
+            String htableName = getOptionValue(OPTION_HTABLE_NAME).toUpperCase();
             KylinConfig config = KylinConfig.getInstanceFromEnv();
             CubeManager cubeMgr = CubeManager.getInstance(config);
             CubeInstance cube = cubeMgr.getCube(cubeName);
@@ -87,15 +93,18 @@ public class MergeCuboidFromHBaseJob extends CuboidJob {
                 scans.add(scan);
             }
 
-            TableMapReduceUtil.initTableMapperJob(scans, MergeCuboidFromHBaseMapper.class, Text.class,
+            TableMapReduceUtil.initTableMapperJob(scans, MergeCuboidFromHBaseMapper.class, ImmutableBytesWritable.class,
                     Text.class, job);
 
+            /*
             // Reducer - only one
             job.setReducerClass(CuboidReducer.class);
             job.setOutputFormatClass(SequenceFileOutputFormat.class);
             job.setOutputKeyClass(Text.class);
             job.setOutputValueClass(Text.class);
 
+*/
+
             // set job configuration
             job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
             job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
@@ -103,7 +112,16 @@ public class MergeCuboidFromHBaseJob extends CuboidJob {
             // add metadata to distributed cache
             attachKylinPropsAndMetadata(cube, job.getConfiguration());
 
-            setReduceTaskNum(job, config, cubeName, 0);
+
+            HTable htable = new HTable(conf, htableName);
+            HFileOutputFormat.configureIncrementalLoad(job,
+                    htable);
+
+
+            // set Reducer; This need be after configureIncrementalLoad, to overwrite the default reducer class
+            job.setReducerClass(InMemCuboidReducer.class);
+            job.setOutputKeyClass(ImmutableBytesWritable.class);
+            job.setOutputValueClass(KeyValue.class);
 
             this.deletePath(job.getConfiguration(), output);
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e008e2cc/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseMapper.java
index 7c673da..59fa03d 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseMapper.java
@@ -59,7 +59,7 @@ import java.util.List;
 /**
  * @author shaoshi
  */
-public class MergeCuboidFromHBaseMapper extends TableMapper<Text, Text> {
+public class MergeCuboidFromHBaseMapper extends TableMapper<ImmutableBytesWritable, Text> {
 
     private KylinConfig config;
     private String cubeName;
@@ -71,7 +71,8 @@ public class MergeCuboidFromHBaseMapper extends TableMapper<Text, Text> {
     private CubeSegment sourceCubeSegment;// Must be unique during a mapper's
     // life cycle
 
-    private Text outputKey = new Text();
+//    private Text outputKey = new Text();
+    private ImmutableBytesWritable outputKey = new ImmutableBytesWritable();
 
     private byte[] newKeyBuf;
     private RowKeySplitter rowKeySplitter;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e008e2cc/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeStatisticsStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeStatisticsStep.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeStatisticsStep.java
new file mode 100644
index 0000000..5fe3f96
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeStatisticsStep.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.hadoop.cubev2;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.job.constant.BatchConstants;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.hadoop.cube.FactDistinctColumnsReducer;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class MergeStatisticsStep extends AbstractExecutable {
+
+    private static final String CUBE_NAME = "cubeName";
+    private static final String SEGMENT_ID = "segmentId";
+    private static final String MERGING_SEGMENT_IS = "mergingSegmentIds";
+    private static final String MERGED_STATISTICS_PATH = "mergedStatisticsPath";
+    protected Map<Long, HyperLogLogPlusCounter> cuboidHLLMap = Maps.newHashMap();
+
+    public MergeStatisticsStep() {
+        super();
+    }
+
+    @Override
+    @SuppressWarnings("deprecation")
+    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+        KylinConfig kylinConf = context.getConfig();
+        final CubeManager mgr = CubeManager.getInstance(kylinConf);
+        final CubeInstance cube = mgr.getCube(getCubeName());
+        final CubeSegment newSegment = cube.getSegmentById(getSegmentId());
+
+        Configuration conf = new Configuration();
+        ResourceStore rs = ResourceStore.getStore(kylinConf);
+        try {
+
+            for (String segmentId : this.getMergingSegmentIds()) {
+                String fileKey = ResourceStore.CUBE_STATISTICS_ROOT + "/" + getCubeName() + "/" + segmentId + ".seq";
+                InputStream is = rs.getResource(fileKey);
+                File tempFile = null;
+                FileOutputStream tempFileStream = null;
+                try {
+                    tempFile = File.createTempFile(segmentId, ".seq");
+                    tempFileStream = new FileOutputStream(tempFile);
+                    org.apache.commons.io.IOUtils.copy(is, tempFileStream);
+                } finally {
+                    IOUtils.closeStream(is);
+                    IOUtils.closeStream(tempFileStream);
+                }
+
+                FileSystem fs = HadoopUtil.getFileSystem("file:///" + tempFile.getAbsolutePath());
+                SequenceFile.Reader reader = null;
+                try {
+                    reader = new SequenceFile.Reader(fs, new Path(tempFile.getAbsolutePath()), conf);
+                    LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+                    BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+                    while (reader.next(key, value)) {
+                        HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter(16);
+                        ByteArray byteArray = new ByteArray(value.getBytes());
+                        hll.readRegisters(byteArray.asBuffer());
+
+                        if (cuboidHLLMap.get(key.get()) != null) {
+                            cuboidHLLMap.get(key.get()).merge(hll);
+                        } else {
+                            cuboidHLLMap.put(key.get(), hll);
+                        }
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    throw e;
+                } finally {
+                    IOUtils.closeStream(reader);
+                }
+            }
+
+            FactDistinctColumnsReducer.writeCuboidStatistics(conf, getMergedStatisticsPath(), cuboidHLLMap);
+            Path statisticsFilePath = new Path(getMergedStatisticsPath(), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION);
+            FileSystem fs = statisticsFilePath.getFileSystem(conf);
+            FSDataInputStream is = fs.open(statisticsFilePath);
+            try {
+                // put the statistics to metadata store
+                String statisticsFileName = ResourceStore.CUBE_STATISTICS_ROOT + "/" + getCubeName() + "/" + newSegment.getUuid() + ".seq";
+                rs.putResource(statisticsFileName, is, System.currentTimeMillis());
+            } finally {
+                IOUtils.closeStream(is);
+            }
+
+            return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
+        } catch (IOException e) {
+            logger.error("fail to merge cuboid statistics", e);
+            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+        }
+    }
+
+
+    public void setCubeName(String cubeName) {
+        this.setParam(CUBE_NAME, cubeName);
+    }
+
+    private String getCubeName() {
+        return getParam(CUBE_NAME);
+    }
+
+    public void setSegmentId(String segmentId) {
+        this.setParam(SEGMENT_ID, segmentId);
+    }
+
+    private String getSegmentId() {
+        return getParam(SEGMENT_ID);
+    }
+
+    public void setMergingSegmentIds(List<String> ids) {
+        setParam(MERGING_SEGMENT_IS, StringUtils.join(ids, ","));
+    }
+
+    private List<String> getMergingSegmentIds() {
+        final String ids = getParam(MERGING_SEGMENT_IS);
+        if (ids != null) {
+            final String[] splitted = StringUtils.split(ids, ",");
+            ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length);
+            for (String id : splitted) {
+                result.add(id);
+            }
+            return result;
+        } else {
+            return Collections.emptyList();
+        }
+    }
+
+    public void setMergedStatisticsPath(String path) {
+        setParam(MERGED_STATISTICS_PATH, path);
+    }
+
+    private String getMergedStatisticsPath() {
+        return getParam(MERGED_STATISTICS_PATH);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e008e2cc/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java
index cbe8c10..6e884d5 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java
@@ -33,12 +33,16 @@ import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
 import org.apache.hadoop.hbase.security.User;
-import org.apache.kylin.common.util.Bytes;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
@@ -46,7 +50,6 @@ import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
-import org.apache.kylin.job.constant.BatchConstants;
 import org.apache.kylin.job.hadoop.AbstractHadoopJob;
 import org.apache.kylin.job.tools.DeployCoprocessorCLI;
 import org.apache.kylin.job.tools.LZOSupportnessChecker;
@@ -55,7 +58,10 @@ import org.apache.kylin.metadata.realization.IRealizationConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -75,6 +81,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
     CubeDesc cubeDesc = null;
     String segmentName = null;
     KylinConfig kylinConfig;
+    private int SAMPING_PERCENTAGE = 5;
 
     @Override
     public int run(String[] args) throws Exception {
@@ -85,12 +92,15 @@ public class CreateHTableJob extends AbstractHadoopJob {
         options.addOption(OPTION_PARTITION_FILE_PATH);
         options.addOption(OPTION_HTABLE_NAME);
         options.addOption(OPTION_STATISTICS_ENABLED);
-        options.addOption(OPTION_STATISTICS_OUTPUT);
+        options.addOption(OPTION_STATISTICS_SAMPLING_PERCENT);
         parseOptions(options, args);
 
         Path partitionFilePath = new Path(getOptionValue(OPTION_PARTITION_FILE_PATH));
         boolean statistics_enabled = Boolean.parseBoolean(getOptionValue(OPTION_STATISTICS_ENABLED));
-        Path statisticsFilePath = new Path(getOptionValue(OPTION_STATISTICS_OUTPUT), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION);
+
+        String statistics_sampling_percent = getOptionValue(OPTION_STATISTICS_SAMPLING_PERCENT);
+
+        SAMPING_PERCENTAGE = Integer.parseInt(statistics_sampling_percent);
 
         String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
         kylinConfig = KylinConfig.getInstanceFromEnv();
@@ -133,7 +143,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
 
             byte[][] splitKeys;
             if (statistics_enabled) {
-                splitKeys = getSplitsFromCuboidStatistics(conf, statisticsFilePath);
+                splitKeys = getSplitsFromCuboidStatistics(conf);
             } else {
                 splitKeys = getSplits(conf, partitionFilePath);
             }
@@ -196,7 +206,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
 
 
     @SuppressWarnings("deprecation")
-    protected byte[][] getSplitsFromCuboidStatistics(Configuration conf, Path statisticsFilePath) throws IOException {
+    protected byte[][] getSplitsFromCuboidStatistics(Configuration conf) throws IOException {
 
         List<Integer> rowkeyColumnSize = Lists.newArrayList();
         CubeSegment cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
@@ -217,20 +227,33 @@ public class CreateHTableJob extends AbstractHadoopJob {
         Map<Long, Long> cuboidSizeMap = Maps.newHashMap();
         long totalSizeInM = 0;
 
-        SequenceFile.Reader reader = null;
-
-        FileSystem fs = statisticsFilePath.getFileSystem(conf);
-        if (fs.exists(statisticsFilePath) == false) {
-            System.err.println("Path " + statisticsFilePath + " not found, no region split, HTable will be one region");
-            return null;
+        ResourceStore rs = ResourceStore.getStore(kylinConfig);
+        String fileKey = ResourceStore.CUBE_STATISTICS_ROOT + "/" + cube.getName() + "/" + cubeSegment.getUuid() + ".seq";
+        InputStream is = rs.getResource(fileKey);
+        File tempFile = null;
+        FileOutputStream tempFileStream = null;
+        try {
+            tempFile = File.createTempFile(cubeSegment.getUuid(), ".seq");
+            tempFileStream = new FileOutputStream(tempFile);
+            org.apache.commons.io.IOUtils.copy(is, tempFileStream);
+        } finally {
+            IOUtils.closeStream(is);
+            IOUtils.closeStream(tempFileStream);
         }
 
+        FileSystem fs = HadoopUtil.getFileSystem("file:///" +tempFile.getAbsolutePath());
+        SequenceFile.Reader reader = null;
         try {
-            reader = new SequenceFile.Reader(fs, statisticsFilePath, conf);
+            reader = new SequenceFile.Reader(fs, new Path(tempFile.getAbsolutePath()), conf);
             LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
-            LongWritable value = (LongWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+            BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
             while (reader.next(key, value)) {
-                cuboidSizeMap.put(key.get(), value.get());
+                HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter(16);
+                ByteArray byteArray = new ByteArray(value.getBytes());
+                hll.readRegisters(byteArray.asBuffer());
+
+                cuboidSizeMap.put(key.get(), hll.getCountEstimate() * 100 / SAMPING_PERCENTAGE);
+
             }
         } catch (Exception e) {
             e.printStackTrace();
@@ -263,7 +286,6 @@ public class CreateHTableJob extends AbstractHadoopJob {
         List<Long> regionSplit = Lists.newArrayList();
 
 
-
         long size = 0;
         int regionIndex = 0;
         int cuboidCount = 0;