You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/05/15 05:06:22 UTC
[10/50] [abbrv] incubator-kylin git commit: KYLIN-673 sampling part
data with a configurable percentage.
KYLIN-673 sampling part data with a configurable percentage.
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/f2562639
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/f2562639
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/f2562639
Branch: refs/heads/streaming-localdict
Commit: f25626399b4764310fea7a8876add816c5010196
Parents: 6533a33
Author: Shao Feng, Shi <sh...@ebay.com>
Authored: Wed May 6 10:10:45 2015 +0800
Committer: Shao Feng, Shi <sh...@ebay.com>
Committed: Wed May 6 10:11:09 2015 +0800
----------------------------------------------------------------------
.../org/apache/kylin/common/KylinConfig.java | 9 ++++++---
conf/kylin.properties | 4 +++-
.../test_case_data/sandbox/kylin.properties | 3 +++
.../kylin/job/constant/BatchConstants.java | 2 +-
.../apache/kylin/job/cube/CubingJobBuilder.java | 2 +-
.../kylin/job/hadoop/AbstractHadoopJob.java | 2 +-
.../job/hadoop/cube/FactDistinctColumnsJob.java | 6 +++---
.../hadoop/cube/FactDistinctColumnsReducer.java | 21 +++++---------------
.../cube/FactDistinctHiveColumnsMapper.java | 19 +++++++-----------
9 files changed, 30 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f2562639/common/src/main/java/org/apache/kylin/common/KylinConfig.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/KylinConfig.java b/common/src/main/java/org/apache/kylin/common/KylinConfig.java
index f1e5a73..7de6137 100644
--- a/common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -99,7 +99,7 @@ public class KylinConfig {
public static final String KYLIN_JOB_LOG_DIR = "kylin.job.log.dir";
public static final String KYLIN_JOB_CUBING_IN_MEM = "kylin.job.cubing.inMem";
- public static final String KYLIN_JOB_CUBING_IN_MEM_SAMPLING_MAX = "kylin.job.cubing.inMem.sampling.max";
+ public static final String KYLIN_JOB_CUBING_IN_MEM_SAMPLING_PERCENT = "kylin.job.cubing.inMem.sampling.percent";
public static final String KYLIN_HDFS_WORKING_DIR = "kylin.hdfs.working.dir";
@@ -483,8 +483,11 @@ public class KylinConfig {
return Boolean.parseBoolean(this.getOptional(KYLIN_JOB_CUBING_IN_MEM, "false"));
}
- public int getCubingInMemSamplingMax() {
- return Integer.parseInt(this.getOptional(KYLIN_JOB_CUBING_IN_MEM_SAMPLING_MAX, "100000"));
+ public int getCubingInMemSamplingPercent() {
+ int percent = Integer.parseInt(this.getOptional(KYLIN_JOB_CUBING_IN_MEM_SAMPLING_PERCENT, "5"));
+ percent = Math.max(percent, 1);
+ percent = Math.min(percent, 100);
+ return percent;
}
private String getOptional(String prop) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f2562639/conf/kylin.properties
----------------------------------------------------------------------
diff --git a/conf/kylin.properties b/conf/kylin.properties
index 4932405..31e8356 100644
--- a/conf/kylin.properties
+++ b/conf/kylin.properties
@@ -38,7 +38,9 @@ kylin.job.yarn.app.rest.check.interval.seconds=10
# Whether calculate cube in mem in each mapper;
kylin.job.cubing.inMem=true
-kylin.job.cubing.inMem.sampling.max=100000
+
+#the percentage of the sampling, default 25%
+kylin.job.cubing.inMem.sampling.percent=25
# The cut size for hbase region, in GB.
# E.g, for cube whose capacity be marked as "SMALL", split region per 5GB by default
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f2562639/examples/test_case_data/sandbox/kylin.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties
index acc843d..1830eaf 100644
--- a/examples/test_case_data/sandbox/kylin.properties
+++ b/examples/test_case_data/sandbox/kylin.properties
@@ -36,6 +36,9 @@ kylin.job.concurrent.max.limit=10
# Whether calculate cube in mem in each mapper;
kylin.job.cubing.inMem=false
+#the percentage of the sampling, default 25%
+kylin.job.cubing.inMem.sampling.percent=25
+
# The cut size for hbase region, in GB.
# E.g, for cube whose capacity be marked as "SMALL", split region per 5GB by default
kylin.job.hbase.region.cut.small=5
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f2562639/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java b/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java
index e5ca5af..b0444d0 100644
--- a/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java
+++ b/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java
@@ -50,7 +50,7 @@ public interface BatchConstants {
public static final String CFG_STATISTICS_ENABLED = "statistics.enabled";
public static final String CFG_STATISTICS_OUTPUT = "statistics.ouput";
- public static final String CFG_STATISTICS_SAMPLING_MAX = "statistics.sampling.max";
+ public static final String CFG_STATISTICS_SAMPLING_PERCENT = "statistics.sampling.percent";
public static final String CFG_STATISTICS_CUBE_ESTIMATION = "cube_statistics.txt";
public static final String CFG_STATISTICS_CUBOID_ESTIMATION = "cuboid_statistics.seq";
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f2562639/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
index afca89a..27613df 100644
--- a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
@@ -287,7 +287,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
appendExecCmdParameters(cmd, "segmentname", seg.getName());
appendExecCmdParameters(cmd, "statisticsenabled", String.valueOf(inMemoryCubing()));
appendExecCmdParameters(cmd, "statisticsoutput", getStatisticsPath(seg, jobId));
- appendExecCmdParameters(cmd, "statisticssamplingmax", String.valueOf(engineConfig.getConfig().getCubingInMemSamplingMax()));
+ appendExecCmdParameters(cmd, "statisticssamplingpercent", String.valueOf(engineConfig.getConfig().getCubingInMemSamplingPercent()));
appendExecCmdParameters(cmd, "jobname", "Kylin_Fact_Distinct_Columns_" + seg.getCubeInstance().getName() + "_Step");
appendExecCmdParameters(cmd, "tablename", intermediateHiveTableName);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f2562639/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
index f24705c..1835954 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
@@ -88,7 +88,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
protected static final Option OPTION_STATISTICS_ENABLED = OptionBuilder.withArgName("statisticsenabled").hasArg().isRequired(false).withDescription("Statistics enabled").create("statisticsenabled");
protected static final Option OPTION_STATISTICS_OUTPUT = OptionBuilder.withArgName("statisticsoutput").hasArg().isRequired(false).withDescription("Statistics output").create("statisticsoutput");
- protected static final Option OPTION_STATISTICS_SAMPLING_MAX = OptionBuilder.withArgName("statisticssamplingmax").hasArg().isRequired(false).withDescription("Statistics sampling max").create("statisticssamplingmax");
+ protected static final Option OPTION_STATISTICS_SAMPLING_PERCENT = OptionBuilder.withArgName("statisticssamplingpercent").hasArg().isRequired(false).withDescription("Statistics sampling percentage").create("statisticssamplingpercent");
protected String name;
protected String description;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f2562639/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
index f7fe1ba..9e4b363 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
@@ -57,7 +57,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
options.addOption(OPTION_SEGMENT_NAME);
options.addOption(OPTION_STATISTICS_ENABLED);
options.addOption(OPTION_STATISTICS_OUTPUT);
- options.addOption(OPTION_STATISTICS_SAMPLING_MAX);
+ options.addOption(OPTION_STATISTICS_SAMPLING_PERCENT);
parseOptions(options, args);
job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
@@ -67,7 +67,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
String segmentName = getOptionValue(OPTION_SEGMENT_NAME);
String statistics_enabled = getOptionValue(OPTION_STATISTICS_ENABLED);
String statistics_output = getOptionValue(OPTION_STATISTICS_OUTPUT);
- String statistics_sampling_max = getOptionValue(OPTION_STATISTICS_SAMPLING_MAX);
+ String statistics_sampling_percent = getOptionValue(OPTION_STATISTICS_SAMPLING_PERCENT);
// ----------------------------------------------------------------------------
// add metadata to distributed cache
@@ -78,7 +78,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
job.getConfiguration().set(BatchConstants.CFG_STATISTICS_ENABLED, statistics_enabled);
job.getConfiguration().set(BatchConstants.CFG_STATISTICS_OUTPUT, statistics_output);
- job.getConfiguration().set(BatchConstants.CFG_STATISTICS_SAMPLING_MAX, statistics_sampling_max);
+ job.getConfiguration().set(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, statistics_sampling_percent);
log.info("Starting: " + job.getJobName());
setJobClasspath(job);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f2562639/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
index e1be38a..dd70117 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
@@ -58,7 +58,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text,
protected long baseCuboidId;
protected CubeDesc cubeDesc;
private long totalRowsBeforeMerge = 0;
- private double averageSamplingRatio = 1;
+ private int SAMPING_PERCENTAGE = 5;
@Override
protected void setup(Context context) throws IOException {
@@ -80,6 +80,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text,
baseCuboidRowCountInMappers = Lists.newArrayList();
rowCountInCuboids = Maps.newHashMap();
cuboidHLLMap = Maps.newHashMap();
+ SAMPING_PERCENTAGE = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, "5"));
}
}
@@ -112,7 +113,6 @@ public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text,
// for hll
long cuboidId = 0 - key.get();
- if (cuboidId <= baseCuboidId) {
for (Text value : values) {
HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter(16);
ByteArray byteArray = new ByteArray(value.getBytes());
@@ -130,17 +130,6 @@ public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text,
cuboidHLLMap.put(cuboidId, hll);
}
}
- } else {
- int mapperCount = 0;
- averageSamplingRatio = 0;
- for (Text value : values) {
- averageSamplingRatio += Bytes.toDouble(value.getBytes());
- mapperCount++;
- }
-
- averageSamplingRatio = averageSamplingRatio / mapperCount;
-
- }
}
}
@@ -173,10 +162,10 @@ public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text,
msg = "Total cuboid number: \t" + allCuboids.size();
writeLine(out, msg);
- msg = "Avg samping ratio: \t" + averageSamplingRatio;
+ msg = "Samping percentage: \t" + SAMPING_PERCENTAGE;
writeLine(out, msg);
- writeLine(out, "The following statistics are collected based sampling data, not all data (only if samping ratio = 1).");
+ writeLine(out, "The following statistics are collected based sampling data.");
for (int i = 0; i < baseCuboidRowCountInMappers.size(); i++) {
if (baseCuboidRowCountInMappers.get(i) > 0) {
msg = "Base Cuboid in Mapper " + i + " row count: \t " + baseCuboidRowCountInMappers.get(i);
@@ -223,7 +212,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text,
Collections.sort(allCuboids);
try {
for (long i : allCuboids) {
- writer.append(new LongWritable(i), new LongWritable((long) (rowCountInCuboids.get(i) / averageSamplingRatio)));
+ writer.append(new LongWritable(i), new LongWritable((long) (rowCountInCuboids.get(i) *100 / SAMPING_PERCENTAGE)));
}
} finally {
writer.close();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f2562639/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java
index d784cc1..d81bff1 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hive.hcatalog.data.HCatRecord;
import org.apache.hive.hcatalog.data.schema.HCatSchema;
import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
-import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
import org.apache.kylin.cube.cuboid.CuboidScheduler;
import org.apache.kylin.cube.kv.RowConstants;
@@ -58,7 +57,7 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
private List<String> rowArray;
private HashFunction hf = null;
private int rowCount = 0;
- private int MAX_SAMPING_COUNT = 100000;
+ private int SAMPING_PERCENTAGE = 5;
@Override
protected void setup(Context context) throws IOException {
@@ -68,7 +67,7 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
rowArray = new ArrayList<String>(schema.getFields().size());
collectStatistics = Boolean.parseBoolean(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_ENABLED));
if (collectStatistics) {
- MAX_SAMPING_COUNT = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_MAX, "100000"));
+ SAMPING_PERCENTAGE = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, "5"));
cuboidScheduler = new CuboidScheduler(cubeDesc);
nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length;
@@ -128,18 +127,20 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
handleErrorRecord(record, ex);
}
- if (collectStatistics && rowCount < MAX_SAMPING_COUNT) {
+ if (collectStatistics && rowCount < SAMPING_PERCENTAGE) {
putRowKeyToHLL(rowArray);
}
- rowCount++;
+ if (rowCount++ == 100)
+ rowCount = 0;
}
private void putRowKeyToHLL(List<String> row) {
for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) {
Hasher hc = hf.newHasher();
for (int position = 0; position < allCuboidsBitSet[i].length; position++) {
- hc.putString(row.get(allCuboidsBitSet[i][position]));
+ if (row.get(allCuboidsBitSet[i][position]) != null)
+ hc.putString(row.get(allCuboidsBitSet[i][position]));
hc.putString(",");
}
@@ -161,12 +162,6 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
outputValue.set(hllBuf.array(), 0, hllBuf.position());
context.write(outputKey, outputValue);
}
-
- double samplingRatio = rowCount < MAX_SAMPING_COUNT ? 1.0 : ((double) MAX_SAMPING_COUNT) / rowCount;
- //output the total hll for this mapper;
- outputKey.set(0 - baseCuboidId - 1);
- outputValue.set(Bytes.toBytes(samplingRatio));
- context.write(outputKey, outputValue);
}
}