You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/07/27 01:48:16 UTC
[kylin] branch master updated: KYLIN-3453 Improve cube size
estimation for topn, count distinct
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new 1cd0026 KYLIN-3453 Improve cube size estimation for topn,count distinct
1cd0026 is described below
commit 1cd00269a63f51a011f93effe6fc982d3e614459
Author: chao long <wa...@qq.com>
AuthorDate: Thu Jul 26 11:46:25 2018 +0800
KYLIN-3453 Improve cube size estimation for topn,count distinct
---
.../org/apache/kylin/common/KylinConfigBase.java | 16 +++++----
.../kylin/measure/bitmap/BitmapSerializer.java | 14 ++++++++
.../apache/kylin/measure/hllc/HLLCSerializer.java | 12 +++++++
.../org/apache/kylin/measure/hllc/HLLCounter.java | 4 +--
.../kylin/measure/topn/TopNCounterSerializer.java | 17 ++++++++--
.../kylin/engine/mr/common/CubeStatsReader.java | 39 +++++++++++++++++-----
.../kylin/engine/mr/common/CubeStatsWriter.java | 16 +++++----
.../kylin/engine/mr/steps/SaveStatisticsStep.java | 9 ++---
8 files changed, 99 insertions(+), 28 deletions(-)
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index b2331e1..43e7d62 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -426,7 +426,6 @@ abstract public class KylinConfigBase implements Serializable {
return Double.parseDouble(getOptional("kylin.snapshot.ext.local.cache.max-size-gb", "200"));
}
-
// ============================================================================
// CUBE
// ============================================================================
@@ -449,7 +448,11 @@ abstract public class KylinConfigBase implements Serializable {
}
public double getJobCuboidSizeCountDistinctRatio() {
- return Double.parseDouble(getOptional("kylin.cube.size-estimate-countdistinct-ratio", "0.05"));
+ return Double.parseDouble(getOptional("kylin.cube.size-estimate-countdistinct-ratio", "0.5"));
+ }
+
+ public double getJobCuboidSizeTopNRatio() {
+ return Double.parseDouble(getOptional("kylin.cube.size-estimate-topn-ratio", "0.5"));
}
public String getCubeAlgorithm() {
@@ -876,7 +879,7 @@ abstract public class KylinConfigBase implements Serializable {
public Map<String, String> getSqoopConfigOverride() {
return getPropertiesByPrefix("kylin.source.jdbc.sqoop-config-override.");
}
-
+
public String getJdbcSourceFieldDelimiter() {
return getOptional("kylin.source.jdbc.field-delimiter", "|");
}
@@ -1227,11 +1230,11 @@ abstract public class KylinConfigBase implements Serializable {
public Boolean isEnumerableRulesEnabled() {
return Boolean.parseBoolean(getOptional("kylin.query.calcite.enumerable-rules-enabled", "false"));
}
-
+
public boolean isReduceExpressionsRulesEnabled() {
return Boolean.parseBoolean(getOptional("kylin.query.calcite.reduce-rules-enabled", "true"));
}
-
+
public boolean isConvertCreateTableToWith() {
return Boolean.valueOf(getOptional("kylin.query.convert-create-table-to-with", "false"));
}
@@ -1332,12 +1335,13 @@ abstract public class KylinConfigBase implements Serializable {
public int getBadQueryDefaultAlertingSeconds() {
return Integer.parseInt(getOptional("kylin.query.badquery-alerting-seconds", "90"));
}
+
public double getBadQueryDefaultAlertingCoefficient() {
return Double.parseDouble(getOptional("kylin.query.timeout-seconds-coefficient", "0.5"));
}
public int getBadQueryDefaultDetectIntervalSeconds() {
- int time =(int) (getQueryTimeoutSeconds() * getBadQueryDefaultAlertingCoefficient()); // half of query timeout
+ int time = (int) (getQueryTimeoutSeconds() * getBadQueryDefaultAlertingCoefficient()); // half of query timeout
if (time == 0) {
time = 60; // 60 sec
}
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java
index 1c13876..29a25e9 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java
@@ -30,6 +30,7 @@ public class BitmapSerializer extends DataTypeSerializer<BitmapCounter> {
private static final int IS_RESULT_FLAG = 1;
private static final int RESULT_SIZE = 12;
+ private static final int DEFAULT_MAX_SIZE = 1024;
// called by reflection
public BitmapSerializer(DataType type) {
@@ -86,6 +87,19 @@ public class BitmapSerializer extends DataTypeSerializer<BitmapCounter> {
}
@Override
+ protected double getStorageBytesEstimate(double averageNumOfElementsInCounter) {
+ // MappeableArrayContainer DEFAULT_MAX_SIZE = 4096
+ if (averageNumOfElementsInCounter < DEFAULT_MAX_SIZE) {
+ // 8 = 4 + 4 for SERIAL_COOKIE_NO_RUNCONTAINER + size
+ // size * 8 = 2 * size + 2 * size + 4 * size as keys + values Cardinality + startOffsets
+ // size * 8 for values array
+ return 8 + averageNumOfElementsInCounter * 16;
+ } else {
+ return getStorageBytesEstimate();
+ }
+ }
+
+ @Override
public boolean supportDirectReturnResult() {
return true;
}
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java
index 98bc5cf..9310864 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java
@@ -80,4 +80,16 @@ public class HLLCSerializer extends DataTypeSerializer<HLLCounter> {
return current().maxLength();
}
+ @Override
+ protected double getStorageBytesEstimate(double averageNumOfElementsInCounter) {
+ int registerIndexSize = current().getRegisterIndexSize();
+ int m = 1 << precision;
+ if (!current().isDense((int) averageNumOfElementsInCounter)
+ || averageNumOfElementsInCounter < (m - 5) / (1 + registerIndexSize)) {
+ // 5 = 1 + 4 for scheme and size
+ // size * (getRegisterIndexSize + 1)
+ return 5 + averageNumOfElementsInCounter * (registerIndexSize + 1);
+ }
+ return getStorageBytesEstimate();
+ }
}
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java
index b793465..80bbb2a 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java
@@ -78,7 +78,7 @@ public class HLLCounter implements Serializable, Comparable<HLLCounter> {
}
}
- private boolean isDense(int size) {
+ public boolean isDense(int size) {
double over = OVERFLOW_FACTOR * m;
return size > (int) over;
}
@@ -358,7 +358,7 @@ public class HLLCounter implements Serializable, Comparable<HLLCounter> {
return 1 + m;
}
- private int getRegisterIndexSize() {
+ public int getRegisterIndexSize() {
return (p - 1) / 8 + 1; // 2 when p=16, 3 when p=17
}
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java
index 77a69cf..eff510f 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java
@@ -61,12 +61,12 @@ public class TopNCounterSerializer extends DataTypeSerializer<TopNCounter<ByteAr
@Override
public int maxLength() {
- return Math.max(precision * TopNCounter.EXTRA_SPACE_RATE * (scale + 8), 1024 * 1024); // use at least 1M
+ return Math.max(precision * TopNCounter.EXTRA_SPACE_RATE * storageBytesEstimatePerCounter(), 1024 * 1024); // use at least 1M
}
@Override
public int getStorageBytesEstimate() {
- return precision * TopNCounter.EXTRA_SPACE_RATE * (scale + 8);
+ return precision * TopNCounter.EXTRA_SPACE_RATE * storageBytesEstimatePerCounter();
}
@Override
@@ -107,4 +107,17 @@ public class TopNCounterSerializer extends DataTypeSerializer<TopNCounter<ByteAr
return counter;
}
+ @Override
+ protected double getStorageBytesEstimate(double averageNumOfElementsInCounter) {
+ if (averageNumOfElementsInCounter < precision * TopNCounter.EXTRA_SPACE_RATE) {
+ return averageNumOfElementsInCounter * storageBytesEstimatePerCounter() + 12;
+ } else {
+ return getStorageBytesEstimate();
+ }
+ }
+
+ private int storageBytesEstimatePerCounter() {
+ return (scale + 8);
+ }
+
}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
index 3c054a3..6b8934a 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
@@ -55,6 +55,7 @@ import org.apache.kylin.cube.kv.CubeDimEncMap;
import org.apache.kylin.cube.kv.RowKeyEncoder;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.measure.hllc.HLLCounter;
+import org.apache.kylin.measure.topn.TopNMeasureType;
import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
@@ -79,6 +80,7 @@ public class CubeStatsReader {
final double mapperOverlapRatioOfFirstBuild; // becomes meaningless after merge
final Map<Long, HLLCounter> cuboidRowEstimatesHLL;
final CuboidScheduler cuboidScheduler;
+ final long sourceRowCount;
public CubeStatsReader(CubeSegment cubeSegment, KylinConfig kylinConfig) throws IOException {
this(cubeSegment, cubeSegment.getCuboidScheduler(), kylinConfig);
@@ -94,7 +96,7 @@ public class CubeStatsReader {
RawResource resource = store.getResource(statsKey);
if (resource == null)
throw new IllegalStateException("Missing resource at " + statsKey);
-
+
File tmpSeqFile = writeTmpSeqFile(resource.inputStream);
Path path = new Path(HadoopUtil.fixWindowsPath("file://" + tmpSeqFile.getAbsolutePath()));
@@ -107,6 +109,7 @@ public class CubeStatsReader {
this.mapperNumberOfFirstBuild = cubeStatsResult.getMapperNumber();
this.mapperOverlapRatioOfFirstBuild = cubeStatsResult.getMapperOverlapRatio();
this.cuboidRowEstimatesHLL = cubeStatsResult.getCounterMap();
+ this.sourceRowCount = cubeStatsResult.getSourceRecordCount();
}
/**
@@ -129,6 +132,7 @@ public class CubeStatsReader {
this.mapperNumberOfFirstBuild = cubeStatsResult.getMapperNumber();
this.mapperOverlapRatioOfFirstBuild = cubeStatsResult.getMapperOverlapRatio();
this.cuboidRowEstimatesHLL = cubeStatsResult.getCounterMap();
+ this.sourceRowCount = cubeStatsResult.getSourceRecordCount();
}
private File writeTmpSeqFile(InputStream inputStream) throws IOException {
@@ -158,7 +162,7 @@ public class CubeStatsReader {
// return map of Cuboid ID => MB
public Map<Long, Double> getCuboidSizeMap() {
- return getCuboidSizeMapFromRowCount(seg, getCuboidRowEstimatesHLL());
+ return getCuboidSizeMapFromRowCount(seg, getCuboidRowEstimatesHLL(), sourceRowCount);
}
public double estimateCubeSize() {
@@ -184,7 +188,8 @@ public class CubeStatsReader {
return cuboidRowCountMap;
}
- public static Map<Long, Double> getCuboidSizeMapFromRowCount(CubeSegment cubeSegment, Map<Long, Long> rowCountMap) {
+ public static Map<Long, Double> getCuboidSizeMapFromRowCount(CubeSegment cubeSegment, Map<Long, Long> rowCountMap,
+ long sourceRowCount) {
final CubeDesc cubeDesc = cubeSegment.getCubeDesc();
final List<Integer> rowkeyColumnSize = Lists.newArrayList();
final Cuboid baseCuboid = Cuboid.getBaseCuboid(cubeDesc);
@@ -199,7 +204,7 @@ public class CubeStatsReader {
Map<Long, Double> sizeMap = Maps.newHashMap();
for (Map.Entry<Long, Long> entry : rowCountMap.entrySet()) {
sizeMap.put(entry.getKey(), estimateCuboidStorageSize(cubeSegment, entry.getKey(), entry.getValue(),
- baseCuboid.getId(), baseCuboidRowCount, rowkeyColumnSize));
+ baseCuboid.getId(), baseCuboidRowCount, rowkeyColumnSize, sourceRowCount));
}
return sizeMap;
}
@@ -210,7 +215,7 @@ public class CubeStatsReader {
* @return the cuboid size in M bytes
*/
private static double estimateCuboidStorageSize(CubeSegment cubeSegment, long cuboidId, long rowCount,
- long baseCuboidId, long baseCuboidCount, List<Integer> rowKeyColumnLength) {
+ long baseCuboidId, long baseCuboidCount, List<Integer> rowKeyColumnLength, long sourceRowCount) {
int rowkeyLength = cubeSegment.getRowKeyPreambleSize();
KylinConfig kylinConf = cubeSegment.getConfig();
@@ -228,12 +233,21 @@ public class CubeStatsReader {
int normalSpace = rowkeyLength;
int countDistinctSpace = 0;
double percentileSpace = 0;
+ int topNSpace = 0;
for (MeasureDesc measureDesc : cubeSegment.getCubeDesc().getMeasures()) {
+ if (rowCount == 0)
+ break;
DataType returnType = measureDesc.getFunction().getReturnDataType();
if (measureDesc.getFunction().getExpression().equals(FunctionDesc.FUNC_COUNT_DISTINCT)) {
- countDistinctSpace += returnType.getStorageBytesEstimate();
+ long estimateDistinctCount = sourceRowCount / rowCount;
+ estimateDistinctCount = estimateDistinctCount == 0 ? 1L : estimateDistinctCount;
+ countDistinctSpace += returnType.getStorageBytesEstimate(estimateDistinctCount);
} else if (measureDesc.getFunction().getExpression().equals(FunctionDesc.FUNC_PERCENTILE)) {
percentileSpace += returnType.getStorageBytesEstimate(baseCuboidCount * 1.0 / rowCount);
+ } else if (measureDesc.getFunction().getExpression().equals(TopNMeasureType.FUNC_TOP_N)) {
+ long estimateTopNCount = sourceRowCount / rowCount;
+ estimateTopNCount = estimateTopNCount == 0 ? 1L : estimateTopNCount;
+ topNSpace += returnType.getStorageBytesEstimate(estimateTopNCount);
} else {
normalSpace += returnType.getStorageBytesEstimate();
}
@@ -241,9 +255,11 @@ public class CubeStatsReader {
double cuboidSizeRatio = kylinConf.getJobCuboidSizeRatio();
double cuboidSizeMemHungryRatio = kylinConf.getJobCuboidSizeCountDistinctRatio();
+ double cuboidSizeTopNRatio = kylinConf.getJobCuboidSizeTopNRatio();
+
double ret = (1.0 * normalSpace * rowCount * cuboidSizeRatio
- + 1.0 * countDistinctSpace * rowCount * cuboidSizeMemHungryRatio + 1.0 * percentileSpace * rowCount)
- / (1024L * 1024L);
+ + 1.0 * countDistinctSpace * rowCount * cuboidSizeMemHungryRatio + 1.0 * percentileSpace * rowCount
+ + 1.0 * topNSpace * rowCount * cuboidSizeTopNRatio) / (1024L * 1024L);
return ret;
}
@@ -351,6 +367,7 @@ public class CubeStatsReader {
public static class CubeStatsResult {
private int percentage = 100;
private double mapperOverlapRatio = 0;
+ private long sourceRecordCount = 0;
private int mapperNumber = 0;
private Map<Long, HLLCounter> counterMap = Maps.newHashMap();
@@ -367,6 +384,8 @@ public class CubeStatsReader {
mapperOverlapRatio = Bytes.toDouble(value.getBytes());
} else if (key.get() == -2) {
mapperNumber = Bytes.toInt(value.getBytes());
+ } else if (key.get() == -3) {
+ sourceRecordCount = Bytes.toLong(value.getBytes());
} else if (key.get() > 0) {
HLLCounter hll = new HLLCounter(precision);
ByteArray byteArray = new ByteArray(value.getBytes());
@@ -392,6 +411,10 @@ public class CubeStatsReader {
public Map<Long, HLLCounter> getCounterMap() {
return Collections.unmodifiableMap(counterMap);
}
+
+ public long getSourceRecordCount() {
+ return sourceRecordCount;
+ }
}
public static void main(String[] args) throws IOException {
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java
index f50a4be..c3d6042 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java
@@ -40,14 +40,15 @@ public class CubeStatsWriter {
public static void writeCuboidStatistics(Configuration conf, Path outputPath, //
Map<Long, HLLCounter> cuboidHLLMap, int samplingPercentage) throws IOException {
- writeCuboidStatistics(conf, outputPath, cuboidHLLMap, samplingPercentage, 0, 0);
+ writeCuboidStatistics(conf, outputPath, cuboidHLLMap, samplingPercentage, 0, 0, 0);
}
public static void writeCuboidStatistics(Configuration conf, Path outputPath, //
- Map<Long, HLLCounter> cuboidHLLMap, int samplingPercentage, int mapperNumber, double mapperOverlapRatio) throws IOException {
+ Map<Long, HLLCounter> cuboidHLLMap, int samplingPercentage, int mapperNumber, double mapperOverlapRatio,
+ long sourceRecordCoun) throws IOException {
Path seqFilePath = new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
writeCuboidStatisticsInner(conf, seqFilePath, cuboidHLLMap, samplingPercentage, mapperNumber,
- mapperOverlapRatio);
+ mapperOverlapRatio, sourceRecordCoun);
}
//Be care of that the file name for partial cuboid statistics should start with BatchConstants.CFG_OUTPUT_STATISTICS,
@@ -57,12 +58,12 @@ public class CubeStatsWriter {
int shard) throws IOException {
Path seqFilePath = new Path(outputPath, BatchConstants.CFG_OUTPUT_STATISTICS + "_" + shard);
writeCuboidStatisticsInner(conf, seqFilePath, cuboidHLLMap, samplingPercentage, mapperNumber,
- mapperOverlapRatio);
+ mapperOverlapRatio, 0);
}
private static void writeCuboidStatisticsInner(Configuration conf, Path outputFilePath, //
- Map<Long, HLLCounter> cuboidHLLMap, int samplingPercentage, int mapperNumber, double mapperOverlapRatio)
- throws IOException {
+ Map<Long, HLLCounter> cuboidHLLMap, int samplingPercentage, int mapperNumber, double mapperOverlapRatio,
+ long sourceRecordCount) throws IOException {
List<Long> allCuboids = Lists.newArrayList();
allCuboids.addAll(cuboidHLLMap.keySet());
Collections.sort(allCuboids);
@@ -80,6 +81,9 @@ public class CubeStatsWriter {
// sampling percentage at key 0
writer.append(new LongWritable(0L), new BytesWritable(Bytes.toBytes(samplingPercentage)));
+ // flat table source_count at key -3
+ writer.append(new LongWritable(-3), new BytesWritable(Bytes.toBytes(sourceRecordCount)));
+
for (long i : allCuboids) {
valueBuf.clear();
cuboidHLLMap.get(i).writeRegisters(valueBuf);
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
index b532360..1f79539 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
@@ -122,12 +122,15 @@ public class SaveStatisticsStep extends AbstractExecutable {
totalRowsBeforeMerge);
}
double mapperOverlapRatio = grantTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grantTotal;
+ CubingJob cubingJob = (CubingJob) getManager()
+ .getJob(CubingExecutableUtil.getCubingJobId(this.getParams()));
+ long sourceRecordCount = cubingJob.findSourceRecordCount();
CubeStatsWriter.writeCuboidStatistics(hadoopConf, statisticsDir, cuboidHLLMap, samplingPercentage,
- mapperNumber, mapperOverlapRatio);
+ mapperNumber, mapperOverlapRatio, sourceRecordCount);
Path statisticsFile = new Path(statisticsDir, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
logger.info(newSegment + " stats saved to hdfs " + statisticsFile);
-
+
FSDataInputStream is = fs.open(statisticsFile);
try {
// put the statistics to metadata store
@@ -135,8 +138,6 @@ public class SaveStatisticsStep extends AbstractExecutable {
rs.putResource(resPath, is, System.currentTimeMillis());
logger.info(newSegment + " stats saved to resource " + resPath);
- CubingJob cubingJob = (CubingJob) getManager()
- .getJob(CubingExecutableUtil.getCubingJobId(this.getParams()));
StatisticsDecisionUtil.decideCubingAlgorithm(cubingJob, newSegment);
StatisticsDecisionUtil.optimizeCubingPlan(newSegment);
} finally {