You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/05/15 05:06:45 UTC

[33/50] [abbrv] incubator-kylin git commit: KYLIN-750 Merge cube segments from HBase table, refine code

KYLIN-750 Merge cube segments from HBase table, refine code


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/d1e4b9be
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/d1e4b9be
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/d1e4b9be

Branch: refs/heads/streaming-localdict
Commit: d1e4b9beb4efb298eb977df72e3a234e1e3adfab
Parents: 73c275e
Author: Shao Feng, Shi <sh...@ebay.com>
Authored: Tue May 12 14:07:45 2015 +0800
Committer: Shao Feng, Shi <sh...@ebay.com>
Committed: Tue May 12 14:10:35 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/util/Array.java     |  4 ---
 .../java/org/apache/kylin/cube/CubeSegment.java | 22 ++++++++----
 .../apache/kylin/job/cube/CubingJobBuilder.java |  1 -
 .../job/hadoop/cube/FactDistinctColumnsJob.java |  7 ++--
 .../hadoop/cube/FactDistinctColumnsReducer.java | 10 +++---
 .../cube/FactDistinctHiveColumnsMapper.java     |  2 +-
 .../job/hadoop/cube/StorageCleanupJob.java      | 37 +++++++++++---------
 .../hadoop/cubev2/MapContextGTRecordWriter.java |  4 +--
 .../hadoop/cubev2/MergeCuboidFromHBaseJob.java  | 10 +++---
 .../job/hadoop/cubev2/MergeStatisticsStep.java  | 29 +++++++++------
 .../kylin/job/hadoop/hbase/CreateHTableJob.java | 23 ++++++------
 11 files changed, 80 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1e4b9be/common/src/main/java/org/apache/kylin/common/util/Array.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/Array.java b/common/src/main/java/org/apache/kylin/common/util/Array.java
index 7f64a65..c48bb3d 100644
--- a/common/src/main/java/org/apache/kylin/common/util/Array.java
+++ b/common/src/main/java/org/apache/kylin/common/util/Array.java
@@ -18,10 +18,6 @@
 
 package org.apache.kylin.common.util;
 
-import com.google.common.base.Predicates;
-import com.google.common.collect.Iterables;
-import org.apache.commons.collections.PredicateUtils;
-
 import java.util.Arrays;
 
 /*

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1e4b9be/cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index 382f5c2..fbc014b 100644
--- a/cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -18,23 +18,24 @@
 
 package org.apache.kylin.cube;
 
-import java.text.SimpleDateFormat;
-import java.util.Collection;
-import java.util.Map;
-import java.util.TimeZone;
-import java.util.concurrent.ConcurrentHashMap;
-
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
 import com.fasterxml.jackson.annotation.JsonBackReference;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Objects;
+import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.dict.IDictionaryAware;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;
 
+import java.text.SimpleDateFormat;
+import java.util.Collection;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.concurrent.ConcurrentHashMap;
+
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
 public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware {
 
@@ -335,5 +336,12 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware {
     public void setSnapshots(ConcurrentHashMap<String, String> snapshots) {
         this.snapshots = snapshots;
     }
-    
+
+    public String getStatisticsResourcePath() {
+        return getStatisticsResourcePath(this.getCubeInstance().getName(), this.getUuid());
+    }
+
+    public static String getStatisticsResourcePath(String cubeName, String cubeSegmentId) {
+        return ResourceStore.CUBE_STATISTICS_ROOT + "/" + cubeName + "/" + cubeSegmentId + ".seq";
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1e4b9be/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
index 6593ed2..e534441 100644
--- a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
@@ -443,7 +443,6 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
         appendExecCmdParameters(cmd, "input", getRowkeyDistributionOutputPath(seg) + "/part-r-00000");
         appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
         appendExecCmdParameters(cmd, "statisticsenabled", String.valueOf(inMemoryCubing()));
-        appendExecCmdParameters(cmd, "statisticssamplingpercent", String.valueOf(engineConfig.getConfig().getCubingInMemSamplingPercent()));
 
         createHtableStep.setJobParams(cmd.toString());
         createHtableStep.setJobClass(CreateHTableJob.class);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1e4b9be/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
index bc10032..f8863a5 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
@@ -19,7 +19,6 @@
 package org.apache.kylin.job.hadoop.cube;
 
 import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -100,7 +99,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
 
             int result = waitForCompletion(job);
 
-            if(Boolean.parseBoolean(statistics_enabled)) {
+            if(result == 0 && Boolean.parseBoolean(statistics_enabled)) {
                 putStatisticsToResourceStore(statistics_output, newSegment);
             }
 
@@ -145,14 +144,14 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
 
     private void putStatisticsToResourceStore(String statisticsFolder, CubeSegment cubeSegment) throws IOException {
         Path statisticsFilePath = new Path(statisticsFolder, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION);
-        FileSystem fs = HadoopUtil.getFileSystem("hdfs:///" + statisticsFilePath);
+        FileSystem fs = FileSystem.get(HadoopUtil.getCurrentConfiguration());
         if (!fs.exists(statisticsFilePath))
             throw new IOException("File " + statisticsFilePath + " does not exists;");
 
         FSDataInputStream is = fs.open(statisticsFilePath);
         try {
             // put the statistics to metadata store
-            String statisticsFileName = ResourceStore.CUBE_STATISTICS_ROOT + "/" + cubeSegment.getCubeInstance().getName() + "/" + cubeSegment.getUuid() + ".seq";
+            String statisticsFileName = cubeSegment.getStatisticsResourcePath();
             ResourceStore rs = ResourceStore.getStore(KylinConfig.getInstanceFromEnv());
             rs.putResource(statisticsFileName, is, System.currentTimeMillis());
         } finally {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1e4b9be/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
index 5d29e1b..62bbd74 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
@@ -52,7 +52,6 @@ public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text,
     private boolean collectStatistics = false;
     private String statisticsOutput = null;
     private List<Long> baseCuboidRowCountInMappers;
-    //    private Map<Long, Long> rowCountInCuboids;
     protected Map<Long, HyperLogLogPlusCounter> cuboidHLLMap = null;
     protected long baseCuboidId;
     protected CubeDesc cubeDesc;
@@ -77,7 +76,6 @@ public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text,
 
         if (collectStatistics) {
             baseCuboidRowCountInMappers = Lists.newArrayList();
-//            rowCountInCuboids = Maps.newHashMap();
             cuboidHLLMap = Maps.newHashMap();
             SAMPING_PERCENTAGE = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, "5"));
         }
@@ -113,7 +111,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text,
             long cuboidId = 0 - key.get();
 
             for (Text value : values) {
-                HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter(16);
+                HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter(14);
                 ByteArray byteArray = new ByteArray(value.getBytes());
                 hll.readRegisters(byteArray.asBuffer());
 
@@ -139,7 +137,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text,
         //output the hll info;
         if (collectStatistics) {
             writeMapperAndCuboidStatistics(context); // for human check
-            writeCuboidStatistics(context.getConfiguration(), statisticsOutput, cuboidHLLMap); // for CreateHTableJob
+            writeCuboidStatistics(context.getConfiguration(), statisticsOutput, cuboidHLLMap, SAMPING_PERCENTAGE); // for CreateHTableJob
         }
     }
 
@@ -195,7 +193,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text,
 
     }
 
-    public static void writeCuboidStatistics(Configuration conf, String outputPath, Map<Long, HyperLogLogPlusCounter> cuboidHLLMap) throws IOException {
+    public static void writeCuboidStatistics(Configuration conf, String outputPath, Map<Long, HyperLogLogPlusCounter> cuboidHLLMap, int samplingPercentage) throws IOException {
         Path seqFilePath = new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION);
         SequenceFile.Writer writer = SequenceFile.createWriter(conf,
                 SequenceFile.Writer.file(seqFilePath), SequenceFile.Writer.keyClass(LongWritable.class),
@@ -205,6 +203,8 @@ public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text,
         allCuboids.addAll(cuboidHLLMap.keySet());
         Collections.sort(allCuboids);
 
+        // persist the sample percentage with key 0
+        writer.append(new LongWritable(0l), new BytesWritable(Bytes.toBytes(samplingPercentage)));
         ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
         try {
             for (long i : allCuboids) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1e4b9be/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java
index 0b75084..20df4c4 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java
@@ -80,7 +80,7 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
 
             allCuboidsHLL = new HyperLogLogPlusCounter[cuboidIds.length];
             for (int i = 0; i < cuboidIds.length; i++) {
-                allCuboidsHLL[i] = new HyperLogLogPlusCounter(16);
+                allCuboidsHLL[i] = new HyperLogLogPlusCounter(14);
             }
 
             hf = Hashing.md5();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1e4b9be/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java
index 09fba01..2f80f0e 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java
@@ -18,18 +18,6 @@
 
 package org.apache.kylin.job.hadoop.cube;
 
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.job.JobInstance;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.job.manager.ExecutableManager;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
@@ -43,6 +31,18 @@ import org.apache.hadoop.hbase.MasterNotRunningException;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.job.JobInstance;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.job.hadoop.AbstractHadoopJob;
+import org.apache.kylin.job.manager.ExecutableManager;
 import org.apache.kylin.metadata.realization.IRealizationConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -118,8 +118,10 @@ public class StorageCleanupJob extends AbstractHadoopJob {
         for (CubeInstance cube : cubeMgr.listAllCubes()) {
             for (CubeSegment seg : cube.getSegments()) {
                 String tablename = seg.getStorageLocationIdentifier();
-                allTablesNeedToBeDropped.remove(tablename);
-                log.info("Remove table " + tablename + " from drop list, as the table belongs to cube " + cube.getName() + " with status " + cube.getStatus());
+                if (allTablesNeedToBeDropped.contains(tablename)) {
+                    allTablesNeedToBeDropped.remove(tablename);
+                    log.info("Exclude table " + tablename + " from drop list, as the table belongs to cube " + cube.getName() + " with status " + cube.getStatus());
+                }
             }
         }
 
@@ -127,8 +129,11 @@ public class StorageCleanupJob extends AbstractHadoopJob {
         for (IIInstance ii : iiManager.listAllIIs()) {
             for (IISegment seg : ii.getSegments()) {
                 String tablename = seg.getStorageLocationIdentifier();
-                allTablesNeedToBeDropped.remove(tablename);
-                log.info("Remove table " + tablename + " from drop list, as the table belongs to ii " + ii.getName() + " with status " + ii.getStatus());
+
+                if (allTablesNeedToBeDropped.contains(tablename)) {
+                    allTablesNeedToBeDropped.remove(tablename);
+                    log.info("Exclude table " + tablename + " from drop list, as the table belongs to ii " + ii.getName() + " with status " + ii.getStatus());
+                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1e4b9be/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java
index 3ba80d1..df3f345 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java
@@ -23,7 +23,7 @@ import java.util.BitSet;
 public class MapContextGTRecordWriter implements IGTRecordWriter {
 
     private static final Log logger = LogFactory.getLog(MapContextGTRecordWriter.class);
-    protected MapContext mapContext;
+    protected MapContext<?, ?, ImmutableBytesWritable, Text> mapContext;
     private Long lastCuboidId;
     protected CubeSegment cubeSegment;
     protected CubeDesc cubeDesc;
@@ -37,7 +37,7 @@ public class MapContextGTRecordWriter implements IGTRecordWriter {
     private Text outputValue = new Text();
     long cuboidRowCount = 0;
 
-    public MapContextGTRecordWriter(MapContext mapContext, CubeDesc cubeDesc, CubeSegment cubeSegment) {
+    public MapContextGTRecordWriter(MapContext<?, ?, ImmutableBytesWritable, Text> mapContext, CubeDesc cubeDesc, CubeSegment cubeSegment) {
         this.mapContext = mapContext;
         this.cubeDesc = cubeDesc;
         this.cubeSegment = cubeSegment;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1e4b9be/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseJob.java
index 4f50279..b2172bc 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeCuboidFromHBaseJob.java
@@ -18,6 +18,9 @@
 
 package org.apache.kylin.job.hadoop.cubev2;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -28,21 +31,16 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
-import org.apache.kylin.common.util.Bytes;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.StringSplitter;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.job.constant.BatchConstants;
 import org.apache.kylin.job.hadoop.cube.CuboidJob;
-import org.apache.kylin.job.hadoop.cube.CuboidReducer;
-
-import java.util.ArrayList;
-import java.util.List;
 
 /**
  * @author shaoshi

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1e4b9be/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeStatisticsStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeStatisticsStep.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeStatisticsStep.java
index 5fe3f96..64ea14c 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeStatisticsStep.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MergeStatisticsStep.java
@@ -34,6 +34,7 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -78,8 +79,9 @@ public class MergeStatisticsStep extends AbstractExecutable {
         ResourceStore rs = ResourceStore.getStore(kylinConf);
         try {
 
+            int averageSamplingPercentage = 0;
             for (String segmentId : this.getMergingSegmentIds()) {
-                String fileKey = ResourceStore.CUBE_STATISTICS_ROOT + "/" + getCubeName() + "/" + segmentId + ".seq";
+                String fileKey = CubeSegment.getStatisticsResourcePath(getCubeName(), segmentId);
                 InputStream is = rs.getResource(fileKey);
                 File tempFile = null;
                 FileOutputStream tempFileStream = null;
@@ -99,14 +101,19 @@ public class MergeStatisticsStep extends AbstractExecutable {
                     LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
                     BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
                     while (reader.next(key, value)) {
-                        HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter(16);
-                        ByteArray byteArray = new ByteArray(value.getBytes());
-                        hll.readRegisters(byteArray.asBuffer());
-
-                        if (cuboidHLLMap.get(key.get()) != null) {
-                            cuboidHLLMap.get(key.get()).merge(hll);
+                        if (key.get() == 0l) {
+                            // sampling percentage;
+                            averageSamplingPercentage += Bytes.toInt(value.getBytes());
                         } else {
-                            cuboidHLLMap.put(key.get(), hll);
+                            HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter(14);
+                            ByteArray byteArray = new ByteArray(value.getBytes());
+                            hll.readRegisters(byteArray.asBuffer());
+
+                            if (cuboidHLLMap.get(key.get()) != null) {
+                                cuboidHLLMap.get(key.get()).merge(hll);
+                            } else {
+                                cuboidHLLMap.put(key.get(), hll);
+                            }
                         }
                     }
                 } catch (Exception e) {
@@ -116,14 +123,14 @@ public class MergeStatisticsStep extends AbstractExecutable {
                     IOUtils.closeStream(reader);
                 }
             }
-
-            FactDistinctColumnsReducer.writeCuboidStatistics(conf, getMergedStatisticsPath(), cuboidHLLMap);
+            averageSamplingPercentage = averageSamplingPercentage / this.getMergingSegmentIds().size();
+            FactDistinctColumnsReducer.writeCuboidStatistics(conf, getMergedStatisticsPath(), cuboidHLLMap, averageSamplingPercentage);
             Path statisticsFilePath = new Path(getMergedStatisticsPath(), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION);
             FileSystem fs = statisticsFilePath.getFileSystem(conf);
             FSDataInputStream is = fs.open(statisticsFilePath);
             try {
                 // put the statistics to metadata store
-                String statisticsFileName = ResourceStore.CUBE_STATISTICS_ROOT + "/" + getCubeName() + "/" + newSegment.getUuid() + ".seq";
+                String statisticsFileName = newSegment.getStatisticsResourcePath();
                 rs.putResource(statisticsFileName, is, System.currentTimeMillis());
             } finally {
                 IOUtils.closeStream(is);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1e4b9be/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java
index 6e884d5..8f802b7 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java
@@ -81,7 +81,6 @@ public class CreateHTableJob extends AbstractHadoopJob {
     CubeDesc cubeDesc = null;
     String segmentName = null;
     KylinConfig kylinConfig;
-    private int SAMPING_PERCENTAGE = 5;
 
     @Override
     public int run(String[] args) throws Exception {
@@ -92,16 +91,11 @@ public class CreateHTableJob extends AbstractHadoopJob {
         options.addOption(OPTION_PARTITION_FILE_PATH);
         options.addOption(OPTION_HTABLE_NAME);
         options.addOption(OPTION_STATISTICS_ENABLED);
-        options.addOption(OPTION_STATISTICS_SAMPLING_PERCENT);
         parseOptions(options, args);
 
         Path partitionFilePath = new Path(getOptionValue(OPTION_PARTITION_FILE_PATH));
         boolean statistics_enabled = Boolean.parseBoolean(getOptionValue(OPTION_STATISTICS_ENABLED));
 
-        String statistics_sampling_percent = getOptionValue(OPTION_STATISTICS_SAMPLING_PERCENT);
-
-        SAMPING_PERCENTAGE = Integer.parseInt(statistics_sampling_percent);
-
         String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
         kylinConfig = KylinConfig.getInstanceFromEnv();
         CubeManager cubeMgr = CubeManager.getInstance(kylinConfig);
@@ -228,7 +222,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
         long totalSizeInM = 0;
 
         ResourceStore rs = ResourceStore.getStore(kylinConfig);
-        String fileKey = ResourceStore.CUBE_STATISTICS_ROOT + "/" + cube.getName() + "/" + cubeSegment.getUuid() + ".seq";
+        String fileKey = cubeSegment.getStatisticsResourcePath();
         InputStream is = rs.getResource(fileKey);
         File tempFile = null;
         FileOutputStream tempFileStream = null;
@@ -241,18 +235,23 @@ public class CreateHTableJob extends AbstractHadoopJob {
             IOUtils.closeStream(tempFileStream);
         }
 
-        FileSystem fs = HadoopUtil.getFileSystem("file:///" +tempFile.getAbsolutePath());
+        FileSystem fs = HadoopUtil.getFileSystem("file:///" + tempFile.getAbsolutePath());
         SequenceFile.Reader reader = null;
         try {
             reader = new SequenceFile.Reader(fs, new Path(tempFile.getAbsolutePath()), conf);
             LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
             BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+            int samplingPercentage = 25;
             while (reader.next(key, value)) {
-                HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter(16);
-                ByteArray byteArray = new ByteArray(value.getBytes());
-                hll.readRegisters(byteArray.asBuffer());
+                if (key.get() == 0l) {
+                    samplingPercentage = Bytes.toInt(value.getBytes());
+                } else {
+                    HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter(14);
+                    ByteArray byteArray = new ByteArray(value.getBytes());
+                    hll.readRegisters(byteArray.asBuffer());
 
-                cuboidSizeMap.put(key.get(), hll.getCountEstimate() * 100 / SAMPING_PERCENTAGE);
+                    cuboidSizeMap.put(key.get(), hll.getCountEstimate() * 100 / samplingPercentage);
+                }
 
             }
         } catch (Exception e) {