You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/05/15 05:06:22 UTC

[10/50] [abbrv] incubator-kylin git commit: KYLIN-673 sampling part data with a configurable percentage.

KYLIN-673 sampling part data with a configurable percentage.


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/f2562639
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/f2562639
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/f2562639

Branch: refs/heads/streaming-localdict
Commit: f25626399b4764310fea7a8876add816c5010196
Parents: 6533a33
Author: Shao Feng, Shi <sh...@ebay.com>
Authored: Wed May 6 10:10:45 2015 +0800
Committer: Shao Feng, Shi <sh...@ebay.com>
Committed: Wed May 6 10:11:09 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/KylinConfig.java    |  9 ++++++---
 conf/kylin.properties                           |  4 +++-
 .../test_case_data/sandbox/kylin.properties     |  3 +++
 .../kylin/job/constant/BatchConstants.java      |  2 +-
 .../apache/kylin/job/cube/CubingJobBuilder.java |  2 +-
 .../kylin/job/hadoop/AbstractHadoopJob.java     |  2 +-
 .../job/hadoop/cube/FactDistinctColumnsJob.java |  6 +++---
 .../hadoop/cube/FactDistinctColumnsReducer.java | 21 +++++---------------
 .../cube/FactDistinctHiveColumnsMapper.java     | 19 +++++++-----------
 9 files changed, 30 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f2562639/common/src/main/java/org/apache/kylin/common/KylinConfig.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/KylinConfig.java b/common/src/main/java/org/apache/kylin/common/KylinConfig.java
index f1e5a73..7de6137 100644
--- a/common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -99,7 +99,7 @@ public class KylinConfig {
     public static final String KYLIN_JOB_LOG_DIR = "kylin.job.log.dir";
 
     public static final String KYLIN_JOB_CUBING_IN_MEM = "kylin.job.cubing.inMem";
-    public static final String KYLIN_JOB_CUBING_IN_MEM_SAMPLING_MAX = "kylin.job.cubing.inMem.sampling.max";
+    public static final String KYLIN_JOB_CUBING_IN_MEM_SAMPLING_PERCENT = "kylin.job.cubing.inMem.sampling.percent";
 
     public static final String KYLIN_HDFS_WORKING_DIR = "kylin.hdfs.working.dir";
 
@@ -483,8 +483,11 @@ public class KylinConfig {
         return Boolean.parseBoolean(this.getOptional(KYLIN_JOB_CUBING_IN_MEM, "false"));
     }
 
-    public int getCubingInMemSamplingMax() {
-        return Integer.parseInt(this.getOptional(KYLIN_JOB_CUBING_IN_MEM_SAMPLING_MAX, "100000"));
+    public int getCubingInMemSamplingPercent() {
+        int percent = Integer.parseInt(this.getOptional(KYLIN_JOB_CUBING_IN_MEM_SAMPLING_PERCENT, "5"));
+        percent = Math.max(percent, 1);
+        percent = Math.min(percent, 100);
+        return percent;
     }
 
     private String getOptional(String prop) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f2562639/conf/kylin.properties
----------------------------------------------------------------------
diff --git a/conf/kylin.properties b/conf/kylin.properties
index 4932405..31e8356 100644
--- a/conf/kylin.properties
+++ b/conf/kylin.properties
@@ -38,7 +38,9 @@ kylin.job.yarn.app.rest.check.interval.seconds=10
 
 # Whether calculate cube in mem in each mapper;
 kylin.job.cubing.inMem=true
-kylin.job.cubing.inMem.sampling.max=100000
+
+#the percentage of the sampling, default 25%
+kylin.job.cubing.inMem.sampling.percent=25
 
 # The cut size for hbase region, in GB.
 # E.g, for cube whose capacity be marked as "SMALL", split region per 5GB by default

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f2562639/examples/test_case_data/sandbox/kylin.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties
index acc843d..1830eaf 100644
--- a/examples/test_case_data/sandbox/kylin.properties
+++ b/examples/test_case_data/sandbox/kylin.properties
@@ -36,6 +36,9 @@ kylin.job.concurrent.max.limit=10
 # Whether calculate cube in mem in each mapper;
 kylin.job.cubing.inMem=false
 
+#the percentage of the sampling, default 25%
+kylin.job.cubing.inMem.sampling.percent=25
+
 # The cut size for hbase region, in GB.
 # E.g, for cube whose capacity be marked as "SMALL", split region per 5GB by default
 kylin.job.hbase.region.cut.small=5

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f2562639/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java b/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java
index e5ca5af..b0444d0 100644
--- a/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java
+++ b/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java
@@ -50,7 +50,7 @@ public interface BatchConstants {
 
     public static final String CFG_STATISTICS_ENABLED = "statistics.enabled";
     public static final String CFG_STATISTICS_OUTPUT = "statistics.ouput";
-    public static final String CFG_STATISTICS_SAMPLING_MAX = "statistics.sampling.max";
+    public static final String CFG_STATISTICS_SAMPLING_PERCENT = "statistics.sampling.percent";
     public static final String CFG_STATISTICS_CUBE_ESTIMATION = "cube_statistics.txt";
     public static final String CFG_STATISTICS_CUBOID_ESTIMATION = "cuboid_statistics.seq";
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f2562639/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
index afca89a..27613df 100644
--- a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
@@ -287,7 +287,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
         appendExecCmdParameters(cmd, "segmentname", seg.getName());
         appendExecCmdParameters(cmd, "statisticsenabled", String.valueOf(inMemoryCubing()));
         appendExecCmdParameters(cmd, "statisticsoutput", getStatisticsPath(seg, jobId));
-        appendExecCmdParameters(cmd, "statisticssamplingmax", String.valueOf(engineConfig.getConfig().getCubingInMemSamplingMax()));
+        appendExecCmdParameters(cmd, "statisticssamplingpercent", String.valueOf(engineConfig.getConfig().getCubingInMemSamplingPercent()));
         appendExecCmdParameters(cmd, "jobname", "Kylin_Fact_Distinct_Columns_" + seg.getCubeInstance().getName() + "_Step");
         appendExecCmdParameters(cmd, "tablename", intermediateHiveTableName);
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f2562639/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
index f24705c..1835954 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
@@ -88,7 +88,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
 
     protected static final Option OPTION_STATISTICS_ENABLED = OptionBuilder.withArgName("statisticsenabled").hasArg().isRequired(false).withDescription("Statistics enabled").create("statisticsenabled");
     protected static final Option OPTION_STATISTICS_OUTPUT = OptionBuilder.withArgName("statisticsoutput").hasArg().isRequired(false).withDescription("Statistics output").create("statisticsoutput");
-    protected static final Option OPTION_STATISTICS_SAMPLING_MAX = OptionBuilder.withArgName("statisticssamplingmax").hasArg().isRequired(false).withDescription("Statistics sampling max").create("statisticssamplingmax");
+    protected static final Option OPTION_STATISTICS_SAMPLING_PERCENT = OptionBuilder.withArgName("statisticssamplingpercent").hasArg().isRequired(false).withDescription("Statistics sampling percentage").create("statisticssamplingpercent");
 
     protected String name;
     protected String description;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f2562639/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
index f7fe1ba..9e4b363 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
@@ -57,7 +57,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
             options.addOption(OPTION_SEGMENT_NAME);
             options.addOption(OPTION_STATISTICS_ENABLED);
             options.addOption(OPTION_STATISTICS_OUTPUT);
-            options.addOption(OPTION_STATISTICS_SAMPLING_MAX);
+            options.addOption(OPTION_STATISTICS_SAMPLING_PERCENT);
             parseOptions(options, args);
 
             job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
@@ -67,7 +67,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
             String segmentName = getOptionValue(OPTION_SEGMENT_NAME);
             String statistics_enabled = getOptionValue(OPTION_STATISTICS_ENABLED);
             String statistics_output = getOptionValue(OPTION_STATISTICS_OUTPUT);
-            String statistics_sampling_max = getOptionValue(OPTION_STATISTICS_SAMPLING_MAX);
+            String statistics_sampling_percent = getOptionValue(OPTION_STATISTICS_SAMPLING_PERCENT);
 
             // ----------------------------------------------------------------------------
             // add metadata to distributed cache
@@ -78,7 +78,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
             job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
             job.getConfiguration().set(BatchConstants.CFG_STATISTICS_ENABLED, statistics_enabled);
             job.getConfiguration().set(BatchConstants.CFG_STATISTICS_OUTPUT, statistics_output);
-            job.getConfiguration().set(BatchConstants.CFG_STATISTICS_SAMPLING_MAX, statistics_sampling_max);
+            job.getConfiguration().set(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, statistics_sampling_percent);
             log.info("Starting: " + job.getJobName());
 
             setJobClasspath(job);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f2562639/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
index e1be38a..dd70117 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
@@ -58,7 +58,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text,
     protected long baseCuboidId;
     protected CubeDesc cubeDesc;
     private long totalRowsBeforeMerge = 0;
-    private double averageSamplingRatio = 1;
+    private int SAMPING_PERCENTAGE = 5;
 
     @Override
     protected void setup(Context context) throws IOException {
@@ -80,6 +80,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text,
             baseCuboidRowCountInMappers = Lists.newArrayList();
             rowCountInCuboids = Maps.newHashMap();
             cuboidHLLMap = Maps.newHashMap();
+            SAMPING_PERCENTAGE = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, "5"));
         }
     }
 
@@ -112,7 +113,6 @@ public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text,
             // for hll
             long cuboidId = 0 - key.get();
 
-            if (cuboidId <= baseCuboidId) {
                 for (Text value : values) {
                     HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter(16);
                     ByteArray byteArray = new ByteArray(value.getBytes());
@@ -130,17 +130,6 @@ public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text,
                         cuboidHLLMap.put(cuboidId, hll);
                     }
                 }
-            } else {
-                int mapperCount = 0;
-                averageSamplingRatio = 0;
-                for (Text value : values) {
-                    averageSamplingRatio += Bytes.toDouble(value.getBytes());
-                    mapperCount++;
-                }
-
-                averageSamplingRatio = averageSamplingRatio / mapperCount;
-
-            }
         }
 
     }
@@ -173,10 +162,10 @@ public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text,
 
             msg = "Total cuboid number: \t" + allCuboids.size();
             writeLine(out, msg);
-            msg = "Avg samping ratio: \t" + averageSamplingRatio;
+            msg = "Samping percentage: \t" + SAMPING_PERCENTAGE;
             writeLine(out, msg);
 
-            writeLine(out, "The following statistics are collected based sampling data, not all data (only if samping ratio = 1).");
+            writeLine(out, "The following statistics are collected based sampling data.");
             for (int i = 0; i < baseCuboidRowCountInMappers.size(); i++) {
                 if (baseCuboidRowCountInMappers.get(i) > 0) {
                     msg = "Base Cuboid in Mapper " + i + " row count: \t " + baseCuboidRowCountInMappers.get(i);
@@ -223,7 +212,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text,
         Collections.sort(allCuboids);
         try {
             for (long i : allCuboids) {
-                writer.append(new LongWritable(i), new LongWritable((long) (rowCountInCuboids.get(i) / averageSamplingRatio)));
+                writer.append(new LongWritable(i), new LongWritable((long) (rowCountInCuboids.get(i) *100 / SAMPING_PERCENTAGE)));
             }
         } finally {
             writer.close();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f2562639/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java
index d784cc1..d81bff1 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hive.hcatalog.data.HCatRecord;
 import org.apache.hive.hcatalog.data.schema.HCatSchema;
 import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
-import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
 import org.apache.kylin.cube.cuboid.CuboidScheduler;
 import org.apache.kylin.cube.kv.RowConstants;
@@ -58,7 +57,7 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
     private List<String> rowArray;
     private HashFunction hf = null;
     private int rowCount = 0;
-    private int MAX_SAMPING_COUNT = 100000;
+    private int SAMPING_PERCENTAGE = 5;
 
     @Override
     protected void setup(Context context) throws IOException {
@@ -68,7 +67,7 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
         rowArray = new ArrayList<String>(schema.getFields().size());
         collectStatistics = Boolean.parseBoolean(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_ENABLED));
         if (collectStatistics) {
-            MAX_SAMPING_COUNT = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_MAX, "100000"));
+            SAMPING_PERCENTAGE = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, "5"));
             cuboidScheduler = new CuboidScheduler(cubeDesc);
             nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length;
 
@@ -128,18 +127,20 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
             handleErrorRecord(record, ex);
         }
 
-        if (collectStatistics && rowCount < MAX_SAMPING_COUNT) {
+        if (collectStatistics && rowCount < SAMPING_PERCENTAGE) {
             putRowKeyToHLL(rowArray);
         }
 
-        rowCount++;
+        if (rowCount++ == 100)
+            rowCount = 0;
     }
 
     private void putRowKeyToHLL(List<String> row) {
         for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) {
             Hasher hc = hf.newHasher();
             for (int position = 0; position < allCuboidsBitSet[i].length; position++) {
-                hc.putString(row.get(allCuboidsBitSet[i][position]));
+                if (row.get(allCuboidsBitSet[i][position]) != null)
+                    hc.putString(row.get(allCuboidsBitSet[i][position]));
                 hc.putString(",");
             }
 
@@ -161,12 +162,6 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
                 outputValue.set(hllBuf.array(), 0, hllBuf.position());
                 context.write(outputKey, outputValue);
             }
-
-            double samplingRatio = rowCount < MAX_SAMPING_COUNT ? 1.0 : ((double) MAX_SAMPING_COUNT) / rowCount;
-            //output the total hll for this mapper;
-            outputKey.set(0 - baseCuboidId - 1);
-            outputValue.set(Bytes.toBytes(samplingRatio));
-            context.write(outputKey, outputValue);
         }
     }