You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2020/02/17 09:23:48 UTC
[kylin] branch master updated: KYLIN-4185: optimize CuboidSizeMap
by using historical segments
This is an automated email from the ASF dual-hosted git repository.
nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new a979a49 KYLIN-4185: optimize CuboidSizeMap by using historical segments
a979a49 is described below
commit a979a49932ef0b4bcd48d518559f6df6840e6a22
Author: Zhou Kang <zh...@xiaomi.com>
AuthorDate: Wed Dec 18 21:12:47 2019 +0800
KYLIN-4185: optimize CuboidSizeMap by using historical segments
---
.../org/apache/kylin/common/KylinConfigBase.java | 4 +
.../java/org/apache/kylin/cube/CubeManager.java | 1 +
.../java/org/apache/kylin/cube/CubeSegment.java | 20 +++++
.../java/org/apache/kylin/engine/mr/CubingJob.java | 66 +++++++++++++++
.../kylin/engine/mr/common/CubeStatsReader.java | 99 +++++++++++++++++++++-
.../mr/steps/UpdateCubeInfoAfterBuildStep.java | 5 ++
.../mr/steps/UpdateCubeInfoAfterMergeStep.java | 5 ++
7 files changed, 199 insertions(+), 1 deletion(-)
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 44629e6..a65058c 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -613,6 +613,10 @@ public abstract class KylinConfigBase implements Serializable {
return getOptional("kylin.cube.segment-advisor", "org.apache.kylin.cube.CubeSegmentAdvisor");
}
+ public boolean enableJobCuboidSizeOptimize() {
+ return Boolean.parseBoolean(getOptional("kylin.cube.size-estimate-enable-optimize", "false"));
+ }
+
public double getJobCuboidSizeRatio() {
return Double.parseDouble(getOptional("kylin.cube.size-estimate-ratio", "0.25"));
}
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 189f738..d057982 100755
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -837,6 +837,7 @@ public class CubeManager implements IRealizationProvider {
}
CubeSegment newSegment = newSegment(cubeCopy, tsRange, segRange);
+ newSegment.setMerged(true);
Segments<CubeSegment> mergingSegments = cubeCopy.getMergingSegments(newSegment);
if (mergingSegments.size() <= 1)
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index 178fd39..70fb501 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -82,6 +82,10 @@ public class CubeSegment implements IBuildable, ISegment, Serializable {
private SegmentStatusEnum status;
@JsonProperty("size_kb")
private long sizeKB;
+ @JsonProperty("is_merged")
+ private boolean isMerged;
+ @JsonProperty("estimate_ratio")
+ private List<Double> estimateRatio;
@JsonProperty("input_records")
private long inputRecords;
@JsonProperty("input_records_size")
@@ -224,6 +228,22 @@ public class CubeSegment implements IBuildable, ISegment, Serializable {
this.sizeKB = sizeKB;
}
+ public boolean isMerged() {
+ return isMerged;
+ }
+
+ public void setMerged(boolean isMerged) {
+ this.isMerged = isMerged;
+ }
+
+ public List<Double> getEstimateRatio() {
+ return estimateRatio;
+ }
+
+ public void setEstimateRatio(List<Double> estimateRatio) {
+ this.estimateRatio = estimateRatio;
+ }
+
public long getInputRecords() {
return inputRecords;
}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
index 568392e..5456d55 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
@@ -18,6 +18,7 @@
package org.apache.kylin.engine.mr;
+import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
@@ -26,12 +27,18 @@ import java.util.Map;
import java.util.TimeZone;
import java.util.regex.Matcher;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.common.util.StringUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.engine.mr.common.CubeStatsReader;
import org.apache.kylin.engine.mr.common.MapReduceExecutable;
import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
import org.apache.kylin.job.constant.ExecutableConstants;
@@ -361,4 +368,63 @@ public class CubingJob extends DefaultChainedExecutable {
return Long.parseLong(findExtraInfoBackward(CUBE_SIZE_BYTES, "0"));
}
+ public List<Double> findEstimateRatio(CubeSegment seg, KylinConfig config) {
+ CubeInstance cubeInstance = seg.getCubeInstance();
+ CuboidScheduler cuboidScheduler = cubeInstance.getCuboidScheduler();
+ List<List<Long>> layeredCuboids = cuboidScheduler.getCuboidsByLayer();
+ int totalLevels = cuboidScheduler.getBuildLevel();
+
+ List<Double> result = Lists.newArrayList();
+
+ Map<Long, Double> estimatedSizeMap;
+
+ String cuboidRootPath = getCuboidRootPath(seg, config);
+
+ try {
+ estimatedSizeMap = new CubeStatsReader(seg, config).getCuboidSizeMap(true);
+ } catch (IOException e) {
+ logger.warn("Cannot get segment {} estimated size map", seg.getName());
+
+ return null;
+ }
+
+ for (int level = 0; level <= totalLevels; level++) {
+ double levelEstimatedSize = 0;
+ for (Long cuboidId : layeredCuboids.get(level)) {
+ levelEstimatedSize += estimatedSizeMap.get(cuboidId) == null ? 0.0 : estimatedSizeMap.get(cuboidId);
+ }
+
+ double levelRealSize = getRealSizeByLevel(cuboidRootPath, level);
+
+ if (levelEstimatedSize == 0.0 || levelRealSize == 0.0){
+ result.add(level, -1.0);
+ } else {
+ result.add(level, levelRealSize / levelEstimatedSize);
+ }
+ }
+
+ return result;
+ }
+
+
+ private double getRealSizeByLevel(String rootPath, int level) {
+ try {
+ String levelPath = JobBuilderSupport.getCuboidOutputPathsByLevel(rootPath, level);
+ FileSystem fs = HadoopUtil.getFileSystem(levelPath);
+ return fs.getContentSummary(new Path(levelPath)).getLength() / (1024L * 1024L);
+ } catch (Exception e) {
+ logger.warn("get level real size failed." + e);
+ return 0L;
+ }
+ }
+
+ private String getCuboidRootPath(CubeSegment seg, KylinConfig kylinConfig) {
+ String rootDir = kylinConfig.getHdfsWorkingDirectory();
+ if (!rootDir.endsWith("/")) {
+ rootDir = rootDir + "/";
+ }
+ String jobID = this.getId();
+ return rootDir + "kylin-" + jobID + "/" + seg.getRealization().getName() + "/cuboid/";
+ }
+
}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
index e935173..3c93d05 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
@@ -64,6 +64,7 @@ import org.apache.kylin.measure.topn.TopNMeasureType;
import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -171,7 +172,11 @@ public class CubeStatsReader {
// return map of Cuboid ID => MB
public Map<Long, Double> getCuboidSizeMap() {
- return getCuboidSizeMapFromRowCount(seg, getCuboidRowEstimatesHLL(), sourceRowCount);
+ return getCuboidSizeMap(false);
+ }
+
+ public Map<Long, Double> getCuboidSizeMap(boolean origin) {
+ return getCuboidSizeMapFromRowCount(seg, getCuboidRowEstimatesHLL(), sourceRowCount, origin);
}
public double estimateCubeSize() {
@@ -199,6 +204,11 @@ public class CubeStatsReader {
public static Map<Long, Double> getCuboidSizeMapFromRowCount(CubeSegment cubeSegment, Map<Long, Long> rowCountMap,
long sourceRowCount) {
+ return getCuboidSizeMapFromRowCount(cubeSegment, rowCountMap, sourceRowCount, true);
+ }
+
+ private static Map<Long, Double> getCuboidSizeMapFromRowCount(CubeSegment cubeSegment, Map<Long, Long> rowCountMap,
+ long sourceRowCount, boolean origin) {
final CubeDesc cubeDesc = cubeSegment.getCubeDesc();
final List<Integer> rowkeyColumnSize = Lists.newArrayList();
final Cuboid baseCuboid = Cuboid.getBaseCuboid(cubeDesc);
@@ -215,9 +225,96 @@ public class CubeStatsReader {
sizeMap.put(entry.getKey(), estimateCuboidStorageSize(cubeSegment, entry.getKey(), entry.getValue(),
baseCuboid.getId(), baseCuboidRowCount, rowkeyColumnSize, sourceRowCount));
}
+
+ if (origin == false && cubeSegment.getConfig().enableJobCuboidSizeOptimize()) {
+ optimizeSizeMap(sizeMap, cubeSegment);
+ }
+
return sizeMap;
}
+ private static Double harmonicMean(List<Double> data) {
+ if (data == null || data.size() == 0) {
+ return 1.0;
+ }
+ Double sum = 0.0;
+ for (Double item : data) {
+ sum += 1.0 / item;
+ }
+ return data.size() / sum;
+ }
+
+ private static List<Double> getHistoricalRating(CubeSegment cubeSegment,
+ CubeInstance cubeInstance,
+ int totalLevels) {
+ boolean isMerged = cubeSegment.isMerged();
+
+ Map<Integer, List<Double>> layerRatio = Maps.newHashMap();
+ List<Double> result = Lists.newArrayList();
+
+ for (CubeSegment seg : cubeInstance.getSegments(SegmentStatusEnum.READY)) {
+ if (seg.isMerged() != isMerged || seg.getEstimateRatio() == null) {
+ continue;
+ }
+
+ logger.info("get ratio from {} with: {}", seg.getName(), StringUtils.join(seg.getEstimateRatio(), ","));
+
+ for(int level = 0; level <= totalLevels; level++) {
+ if (seg.getEstimateRatio().get(level) <= 0) {
+ continue;
+ }
+
+ List<Double> temp = layerRatio.get(level) == null ? Lists.newArrayList() : layerRatio.get(level);
+
+ temp.add(seg.getEstimateRatio().get(level));
+ layerRatio.put(level, temp);
+ }
+ }
+
+ if (layerRatio.size() == 0) {
+ logger.info("Fail to get historical rating.");
+ return null;
+ } else {
+ for(int level = 0; level <= totalLevels; level++) {
+ logger.debug("level {}: {}", level, StringUtils.join(layerRatio.get(level), ","));
+ result.add(level, harmonicMean(layerRatio.get(level)));
+ }
+
+ logger.info("Finally estimate ratio is {}", StringUtils.join(result, ","));
+
+ return result;
+ }
+ }
+
+ private static void optimizeSizeMap(Map<Long, Double> sizeMap, CubeSegment cubeSegment) {
+ CubeInstance cubeInstance = cubeSegment.getCubeInstance();
+ int totalLevels = cubeInstance.getCuboidScheduler().getBuildLevel();
+ List<List<Long>> layeredCuboids = cubeInstance.getCuboidScheduler().getCuboidsByLayer();
+
+ logger.info("cube size is {} before optimize", SumHelper.sumDouble(sizeMap.values()));
+
+ List<Double> levelRating = getHistoricalRating(cubeSegment, cubeInstance, totalLevels);
+
+ if (levelRating == null) {
+ logger.info("Fail to optimize, use origin.");
+ return;
+ }
+
+ for (int level = 0; level <= totalLevels; level++) {
+ Double rate = levelRating.get(level);
+
+ for (Long cuboidId : layeredCuboids.get(level)) {
+ double oriValue = (sizeMap.get(cuboidId) == null ? 0.0 : sizeMap.get(cuboidId));
+ sizeMap.put(cuboidId, oriValue * rate);
+ }
+ }
+
+ logger.info("cube size is {} after optimize", SumHelper.sumDouble(sizeMap.values()));
+
+ return;
+ }
+
+
/**
* Estimate the cuboid's size
*
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
index c2031da..2f13fdb 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
@@ -30,6 +30,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.cube.CubeInstance;
@@ -76,11 +77,15 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
long sourceSizeBytes = cubingJob.findSourceSizeBytes();
long cubeSizeBytes = cubingJob.findCubeSizeBytes();
+ KylinConfig config = KylinConfig.getInstanceFromEnv();
+ List<Double> cuboidEstimateRatio = cubingJob.findEstimateRatio(segment, config);
+
segment.setLastBuildJobID(CubingExecutableUtil.getCubingJobId(this.getParams()));
segment.setLastBuildTime(System.currentTimeMillis());
segment.setSizeKB(cubeSizeBytes / 1024);
segment.setInputRecords(sourceCount);
segment.setInputRecordsSize(sourceSizeBytes);
+ segment.setEstimateRatio(cuboidEstimateRatio);
try {
saveExtSnapshotIfNeeded(cubeManager, cube, segment);
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
index e7b127e..0a8cd1e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
+import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
@@ -92,6 +93,9 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
}
}
+ KylinConfig config = KylinConfig.getInstanceFromEnv();
+ List<Double> cuboidEstimateRatio = cubingJob.findEstimateRatio(mergedSegment, config);
+
// update segment info
mergedSegment.setSizeKB(cubeSizeBytes / 1024);
mergedSegment.setInputRecords(sourceCount);
@@ -100,6 +104,7 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
mergedSegment.setLastBuildTime(System.currentTimeMillis());
mergedSegment.setDimensionRangeInfoMap(mergedSegDimRangeMap);
mergedSegment.setStreamSourceCheckpoint(lastMergedSegment != null ? lastMergedSegment.getStreamSourceCheckpoint() : null);
+ mergedSegment.setEstimateRatio(cuboidEstimateRatio);
if (isOffsetCube) {
SegmentRange.TSRange tsRange = new SegmentRange.TSRange(tsStartMin, tsEndMax);