You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2020/02/17 09:23:48 UTC

[kylin] branch master updated: KYLIN-4185: optimize CuboidSizeMap by using historical segments

This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new a979a49  KYLIN-4185: optimize CuboidSizeMap by using historical segments
a979a49 is described below

commit a979a49932ef0b4bcd48d518559f6df6840e6a22
Author: Zhou Kang <zh...@xiaomi.com>
AuthorDate: Wed Dec 18 21:12:47 2019 +0800

    KYLIN-4185: optimize CuboidSizeMap by using historical segments
---
 .../org/apache/kylin/common/KylinConfigBase.java   |  4 +
 .../java/org/apache/kylin/cube/CubeManager.java    |  1 +
 .../java/org/apache/kylin/cube/CubeSegment.java    | 20 +++++
 .../java/org/apache/kylin/engine/mr/CubingJob.java | 66 +++++++++++++++
 .../kylin/engine/mr/common/CubeStatsReader.java    | 99 +++++++++++++++++++++-
 .../mr/steps/UpdateCubeInfoAfterBuildStep.java     |  5 ++
 .../mr/steps/UpdateCubeInfoAfterMergeStep.java     |  5 ++
 7 files changed, 199 insertions(+), 1 deletion(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 44629e6..a65058c 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -613,6 +613,10 @@ public abstract class KylinConfigBase implements Serializable {
         return getOptional("kylin.cube.segment-advisor", "org.apache.kylin.cube.CubeSegmentAdvisor");
     }
 
+    public boolean enableJobCuboidSizeOptimize() {
+        return Boolean.parseBoolean(getOptional("kylin.cube.size-estimate-enable-optimize", "false"));
+    }
+
     public double getJobCuboidSizeRatio() {
         return Double.parseDouble(getOptional("kylin.cube.size-estimate-ratio", "0.25"));
     }
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 189f738..d057982 100755
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -837,6 +837,7 @@ public class CubeManager implements IRealizationProvider {
             }
 
             CubeSegment newSegment = newSegment(cubeCopy, tsRange, segRange);
+            newSegment.setMerged(true);
 
             Segments<CubeSegment> mergingSegments = cubeCopy.getMergingSegments(newSegment);
             if (mergingSegments.size() <= 1)
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index 178fd39..70fb501 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -82,6 +82,10 @@ public class CubeSegment implements IBuildable, ISegment, Serializable {
     private SegmentStatusEnum status;
     @JsonProperty("size_kb")
     private long sizeKB;
+    @JsonProperty("is_merged")
+    private boolean isMerged;
+    @JsonProperty("estimate_ratio")
+    private List<Double> estimateRatio;
     @JsonProperty("input_records")
     private long inputRecords;
     @JsonProperty("input_records_size")
@@ -224,6 +228,22 @@ public class CubeSegment implements IBuildable, ISegment, Serializable {
         this.sizeKB = sizeKB;
     }
 
+    public boolean isMerged() {
+        return isMerged;
+    }
+
+    public void setMerged(boolean isMerged) {
+        this.isMerged = isMerged;
+    }
+
+    public List<Double> getEstimateRatio() {
+        return estimateRatio;
+    }
+
+    public void setEstimateRatio(List<Double> estimateRatio) {
+        this.estimateRatio = estimateRatio;
+    }
+
     public long getInputRecords() {
         return inputRecords;
     }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
index 568392e..5456d55 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.engine.mr;
 
+import java.io.IOException;
 import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.List;
@@ -26,12 +27,18 @@ import java.util.Map;
 import java.util.TimeZone;
 import java.util.regex.Matcher;
 
+import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.StringUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.engine.mr.common.CubeStatsReader;
 import org.apache.kylin.engine.mr.common.MapReduceExecutable;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.job.constant.ExecutableConstants;
@@ -361,4 +368,63 @@ public class CubingJob extends DefaultChainedExecutable {
         return Long.parseLong(findExtraInfoBackward(CUBE_SIZE_BYTES, "0"));
     }
 
+    public List<Double> findEstimateRatio(CubeSegment seg, KylinConfig config) {
+        CubeInstance cubeInstance = seg.getCubeInstance();
+        CuboidScheduler cuboidScheduler = cubeInstance.getCuboidScheduler();
+        List<List<Long>> layeredCuboids = cuboidScheduler.getCuboidsByLayer();
+        int totalLevels = cuboidScheduler.getBuildLevel();
+
+        List<Double> result = Lists.newArrayList();
+
+        Map<Long, Double> estimatedSizeMap;
+
+        String cuboidRootPath = getCuboidRootPath(seg, config);
+
+        try {
+            estimatedSizeMap = new CubeStatsReader(seg, config).getCuboidSizeMap(true);
+        } catch (IOException e) {
+            logger.warn("Cannot get segment {} estimated size map", seg.getName());
+
+            return null;
+        }
+
+        for (int level = 0; level <= totalLevels; level++) {
+            double levelEstimatedSize = 0;
+            for (Long cuboidId : layeredCuboids.get(level)) {
+                levelEstimatedSize += estimatedSizeMap.get(cuboidId) == null ? 0.0 : estimatedSizeMap.get(cuboidId);
+            }
+
+            double levelRealSize = getRealSizeByLevel(cuboidRootPath, level);
+
+            if (levelEstimatedSize == 0.0 || levelRealSize == 0.0){
+                result.add(level, -1.0);
+            } else {
+                result.add(level, levelRealSize / levelEstimatedSize);
+            }
+        }
+
+        return result;
+    }
+
+
+    private double getRealSizeByLevel(String rootPath, int level) {
+        try {
+            String levelPath = JobBuilderSupport.getCuboidOutputPathsByLevel(rootPath, level);
+            FileSystem fs = HadoopUtil.getFileSystem(levelPath);
+            return fs.getContentSummary(new Path(levelPath)).getLength() / (1024L * 1024L);
+        } catch (Exception e) {
+            logger.warn("get level real size failed." + e);
+            return 0L;
+        }
+    }
+
+    private String getCuboidRootPath(CubeSegment seg, KylinConfig kylinConfig) {
+        String rootDir = kylinConfig.getHdfsWorkingDirectory();
+        if (!rootDir.endsWith("/")) {
+            rootDir = rootDir + "/";
+        }
+        String jobID = this.getId();
+        return rootDir + "kylin-" + jobID + "/" + seg.getRealization().getName() + "/cuboid/";
+    }
+
 }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
index e935173..3c93d05 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
@@ -64,6 +64,7 @@ import org.apache.kylin.measure.topn.TopNMeasureType;
 import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -171,7 +172,11 @@ public class CubeStatsReader {
 
     // return map of Cuboid ID => MB
     public Map<Long, Double> getCuboidSizeMap() {
-        return getCuboidSizeMapFromRowCount(seg, getCuboidRowEstimatesHLL(), sourceRowCount);
+        return getCuboidSizeMap(false);
+    }
+
+    public Map<Long, Double> getCuboidSizeMap(boolean origin) {
+        return getCuboidSizeMapFromRowCount(seg, getCuboidRowEstimatesHLL(), sourceRowCount, origin);
     }
 
     public double estimateCubeSize() {
@@ -199,6 +204,11 @@ public class CubeStatsReader {
 
     public static Map<Long, Double> getCuboidSizeMapFromRowCount(CubeSegment cubeSegment, Map<Long, Long> rowCountMap,
             long sourceRowCount) {
+        return getCuboidSizeMapFromRowCount(cubeSegment, rowCountMap, sourceRowCount, true);
+    }
+
+    private static Map<Long, Double> getCuboidSizeMapFromRowCount(CubeSegment cubeSegment, Map<Long, Long> rowCountMap,
+                                                                  long sourceRowCount, boolean origin) {
         final CubeDesc cubeDesc = cubeSegment.getCubeDesc();
         final List<Integer> rowkeyColumnSize = Lists.newArrayList();
         final Cuboid baseCuboid = Cuboid.getBaseCuboid(cubeDesc);
@@ -215,9 +225,96 @@ public class CubeStatsReader {
             sizeMap.put(entry.getKey(), estimateCuboidStorageSize(cubeSegment, entry.getKey(), entry.getValue(),
                     baseCuboid.getId(), baseCuboidRowCount, rowkeyColumnSize, sourceRowCount));
         }
+
+        if (origin == false && cubeSegment.getConfig().enableJobCuboidSizeOptimize()) {
+            optimizeSizeMap(sizeMap, cubeSegment);
+        }
+
         return sizeMap;
     }
 
+    private static Double harmonicMean(List<Double> data) {
+        if (data == null || data.size() == 0) {
+            return 1.0;
+        }
+        Double sum = 0.0;
+        for (Double item : data) {
+            sum += 1.0 / item;
+        }
+        return data.size() / sum;
+    }
+
+    private static List<Double> getHistoricalRating(CubeSegment cubeSegment,
+                                                    CubeInstance cubeInstance,
+                                                    int totalLevels) {
+        boolean isMerged = cubeSegment.isMerged();
+
+        Map<Integer, List<Double>> layerRatio = Maps.newHashMap();
+        List<Double> result = Lists.newArrayList();
+
+        for (CubeSegment seg : cubeInstance.getSegments(SegmentStatusEnum.READY)) {
+            if (seg.isMerged() != isMerged || seg.getEstimateRatio() == null) {
+                continue;
+            }
+
+            logger.info("get ratio from {} with: {}", seg.getName(), StringUtils.join(seg.getEstimateRatio(), ","));
+
+            for(int level = 0; level <= totalLevels; level++) {
+                if (seg.getEstimateRatio().get(level) <= 0) {
+                    continue;
+                }
+
+                List<Double> temp = layerRatio.get(level) == null ? Lists.newArrayList() : layerRatio.get(level);
+
+                temp.add(seg.getEstimateRatio().get(level));
+                layerRatio.put(level, temp);
+            }
+        }
+
+        if (layerRatio.size() == 0) {
+            logger.info("Fail to get historical rating.");
+            return null;
+        } else {
+            for(int level = 0; level <= totalLevels; level++) {
+                logger.debug("level {}: {}", level, StringUtils.join(layerRatio.get(level), ","));
+                result.add(level, harmonicMean(layerRatio.get(level)));
+            }
+
+            logger.info("Finally estimate ratio is {}", StringUtils.join(result, ","));
+
+            return result;
+        }
+    }
+
+    private static void optimizeSizeMap(Map<Long, Double> sizeMap, CubeSegment cubeSegment) {
+        CubeInstance cubeInstance = cubeSegment.getCubeInstance();
+        int totalLevels = cubeInstance.getCuboidScheduler().getBuildLevel();
+        List<List<Long>> layeredCuboids = cubeInstance.getCuboidScheduler().getCuboidsByLayer();
+
+        logger.info("cube size is {} before optimize", SumHelper.sumDouble(sizeMap.values()));
+
+        List<Double> levelRating = getHistoricalRating(cubeSegment, cubeInstance, totalLevels);
+
+        if (levelRating == null) {
+            logger.info("Fail to optimize, use origin.");
+            return;
+        }
+
+        for (int level = 0; level <= totalLevels; level++) {
+            Double rate = levelRating.get(level);
+
+            for (Long cuboidId : layeredCuboids.get(level)) {
+                double oriValue = (sizeMap.get(cuboidId) == null ? 0.0 : sizeMap.get(cuboidId));
+                sizeMap.put(cuboidId, oriValue * rate);
+            }
+        }
+
+        logger.info("cube size is {} after optimize", SumHelper.sumDouble(sizeMap.values()));
+
+        return;
+    }
+
+
     /**
      * Estimate the cuboid's size
      *
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
index c2031da..2f13fdb 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
@@ -30,6 +30,7 @@ import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.cube.CubeInstance;
@@ -76,11 +77,15 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
         long sourceSizeBytes = cubingJob.findSourceSizeBytes();
         long cubeSizeBytes = cubingJob.findCubeSizeBytes();
 
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        List<Double> cuboidEstimateRatio = cubingJob.findEstimateRatio(segment, config);
+
         segment.setLastBuildJobID(CubingExecutableUtil.getCubingJobId(this.getParams()));
         segment.setLastBuildTime(System.currentTimeMillis());
         segment.setSizeKB(cubeSizeBytes / 1024);
         segment.setInputRecords(sourceCount);
         segment.setInputRecordsSize(sourceSizeBytes);
+        segment.setEstimateRatio(cuboidEstimateRatio);
 
         try {
             saveExtSnapshotIfNeeded(cubeManager, cube, segment);
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
index e7b127e..0a8cd1e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
@@ -92,6 +93,9 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
             }
         }
 
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        List<Double> cuboidEstimateRatio = cubingJob.findEstimateRatio(mergedSegment, config);
+
         // update segment info
         mergedSegment.setSizeKB(cubeSizeBytes / 1024);
         mergedSegment.setInputRecords(sourceCount);
@@ -100,6 +104,7 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
         mergedSegment.setLastBuildTime(System.currentTimeMillis());
         mergedSegment.setDimensionRangeInfoMap(mergedSegDimRangeMap);
         mergedSegment.setStreamSourceCheckpoint(lastMergedSegment != null ? lastMergedSegment.getStreamSourceCheckpoint() : null);
+        mergedSegment.setEstimateRatio(cuboidEstimateRatio);
 
         if (isOffsetCube) {
             SegmentRange.TSRange tsRange = new SegmentRange.TSRange(tsStartMin, tsEndMax);