You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2021/01/04 06:05:07 UTC

[kylin] 04/06: KYLIN-4818 Performance profile for CuboidStatisticsJob

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit d8e5db8457bf5bbd1ae05b9360c5ae48038b57fb
Author: XiaoxiangYu <xx...@apache.org>
AuthorDate: Mon Dec 7 21:30:43 2020 +0800

    KYLIN-4818 Performance profile for CuboidStatisticsJob
---
 .../kylin/engine/mr/common/CubeStatsReader.java    | 11 +++++---
 .../engine/mr/common/CuboidRecommenderUtil.java    |  6 ++--
 .../org/apache/kylin/common/KylinConfigBase.java   | 20 ++++++++++++--
 .../src/main/resources/kylin-defaults.properties   |  2 +-
 .../cube/cuboid/algorithm/CuboidRecommender.java   |  3 +-
 .../cuboid/algorithm/greedy/GreedyAlgorithm.java   | 16 +++++------
 .../org/apache/kylin/cube/kv/CubeDimEncMap.java    |  2 ++
 .../kylin/engine/spark/job/NSparkExecutable.java   |  7 +++--
 .../kylin/engine/spark/job/CubeBuildJob.java       |  6 +++-
 .../engine/spark/job/CuboidStatisticsJob.scala     | 32 +++++++++++++++++++---
 10 files changed, 80 insertions(+), 25 deletions(-)

diff --git a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
index e63fc1a..1a1dd11 100644
--- a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
+++ b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
@@ -171,7 +171,6 @@ public class CubeStatsReader {
         return getCuboidRowCountMapFromSampling(cuboidRowEstimatesHLL, samplingPercentage);
     }
 
-    // return map of Cuboid ID => MB
     public Map<Long, Double> getCuboidSizeMap() {
         return getCuboidSizeMap(false);
     }
@@ -218,7 +217,12 @@ public class CubeStatsReader {
         final Long baseCuboidRowCount = rowCountMap.get(baseCuboid.getId());
 
         for (int i = 0; i < columnList.size(); i++) {
-            rowkeyColumnSize.add(dimEncMap.get(columnList.get(i)).getLengthOfEncoding());
+            /*
+             * A workaround, for the fact kylin do not support self-defined encode in Kylin 4,
+             * it is done by Parquet(https://github.com/apache/parquet-format/blob/master/Encodings.md) for Kylin 4.
+             * It's complex and hard to calculate real size for specific literal value, so I propose to use 4 for a rough estimation.
+             */
+            rowkeyColumnSize.add(4);
         }
 
         Map<Long, Double> sizeMap = Maps.newHashMap();
@@ -360,11 +364,10 @@ public class CubeStatsReader {
             }
         }
 
-        double cuboidSizeRatio = kylinConf.getJobCuboidSizeRatio();
         double cuboidSizeMemHungryRatio = kylinConf.getJobCuboidSizeCountDistinctRatio();
         double cuboidSizeTopNRatio = kylinConf.getJobCuboidSizeTopNRatio();
 
-        double ret = (1.0 * normalSpace * rowCount * cuboidSizeRatio
+        double ret = (1.0 * normalSpace * rowCount
                 + 1.0 * countDistinctSpace * rowCount * cuboidSizeMemHungryRatio + 1.0 * percentileSpace * rowCount
                 + 1.0 * topNSpace * rowCount * cuboidSizeTopNRatio) / (1024L * 1024L);
         return ret;
diff --git a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java
index 6d9b748..f6ae332 100644
--- a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java
+++ b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java
@@ -60,8 +60,10 @@ public class CuboidRecommenderUtil {
         Set<Long> mandatoryCuboids = segment.getCubeDesc().getMandatoryCuboids();
 
         String key = cube.getName();
-        CuboidStats cuboidStats = new CuboidStats.Builder(key, baseCuboid, cubeStatsReader.getCuboidRowEstimatesHLL(),
-                cubeStatsReader.getCuboidSizeMap()).setMandatoryCuboids(mandatoryCuboids).setBPUSMinBenefitRatio(segment.getConfig().getCubePlannerBPUSMinBenefitRatio()).build();
+        CuboidStats cuboidStats = new CuboidStats.Builder(key, baseCuboid, cubeStatsReader.getCuboidRowEstimatesHLL(), cubeStatsReader.getCuboidSizeMap())
+                .setMandatoryCuboids(mandatoryCuboids)
+                .setBPUSMinBenefitRatio(segment.getConfig().getCubePlannerBPUSMinBenefitRatio())
+                .build();
         return CuboidRecommender.getInstance().getRecommendCuboidList(cuboidStats, segment.getConfig(),
                 !mandatoryCuboids.isEmpty());
     }
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index cb0d863..dc91a7f 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -715,11 +715,12 @@ public abstract class KylinConfigBase implements Serializable {
         return Boolean.parseBoolean(getOptional("kylin.cube.size-estimate-enable-optimize", "false"));
     }
 
+    @ConfigTag({ConfigTag.Tag.DEPRECATED, ConfigTag.Tag.NOT_CLEAR})
     public double getJobCuboidSizeRatio() {
         return Double.parseDouble(getOptional("kylin.cube.size-estimate-ratio", "0.25"));
     }
 
-    @Deprecated
+    @ConfigTag({ConfigTag.Tag.DEPRECATED, ConfigTag.Tag.NOT_CLEAR})
     public double getJobCuboidSizeMemHungryRatio() {
         return Double.parseDouble(getOptional("kylin.cube.size-estimate-memhungry-ratio", "0.05"));
     }
@@ -801,7 +802,7 @@ public abstract class KylinConfigBase implements Serializable {
     // ============================================================================
 
     public boolean isCubePlannerEnabled() {
-        return Boolean.parseBoolean(getOptional("kylin.cube.cubeplanner.enabled", TRUE));
+        return Boolean.parseBoolean(getOptional("kylin.cube.cubeplanner.enabled", FALSE));
     }
 
     public boolean isCubePlannerEnabledForExistingCube() {
@@ -833,6 +834,14 @@ public abstract class KylinConfigBase implements Serializable {
     }
 
     /**
+     * Columnar storage, like apache parquet, often use encode and compression to make data smaller,
+     *  and this will affect CuboidRecommendAlgorithm.
+     */
+    public double getStorageCompressionRatio() {
+        return Double.parseDouble(getOptional("kylin.cube.cubeplanner.storage.compression.ratio", "0.2"));
+    }
+
+    /**
      * get assigned server array, which a empty string array in default
      * @return
      */
@@ -2638,6 +2647,13 @@ public abstract class KylinConfigBase implements Serializable {
         return getFileName(kylinHome + File.separator + "lib", PARQUET_JOB_JAR_NAME_PATTERN);
     }
 
+    /**
+     * Use https://github.com/spektom/spark-flamegraph for Spark profile
+     */
+    public String getSparkSubmitCmd() {
+        return getOptional("kylin.engine.spark-cmd", null);
+    }
+
     public void overrideKylinParquetJobJarPath(String path) {
         logger.info("override {} to {}", KYLIN_ENGINE_PARQUET_JOB_JAR, path);
         System.setProperty(KYLIN_ENGINE_PARQUET_JOB_JAR, path);
diff --git a/core-common/src/main/resources/kylin-defaults.properties b/core-common/src/main/resources/kylin-defaults.properties
index e33dbe9..c5cd317 100644
--- a/core-common/src/main/resources/kylin-defaults.properties
+++ b/core-common/src/main/resources/kylin-defaults.properties
@@ -153,7 +153,7 @@ kylin.cube.algorithm.inmem-auto-optimize=true
 
 kylin.cube.aggrgroup.max-combination=32768
 
-kylin.cube.cubeplanner.enabled=true
+kylin.cube.cubeplanner.enabled=false
 kylin.cube.cubeplanner.enabled-for-existing-cube=false
 kylin.cube.cubeplanner.expansion-threshold=15.0
 kylin.cube.cubeplanner.recommend-cache-max-size=200
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/CuboidRecommender.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/CuboidRecommender.java
index 1f3eaaf..c5f7101 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/CuboidRecommender.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/CuboidRecommender.java
@@ -143,7 +143,8 @@ public class CuboidRecommender {
 
         long startTime = System.currentTimeMillis();
         logger.info("Cube Planner Algorithm started at {}", startTime);
-        List<Long> recommendCuboidList = algorithm.recommend(kylinConf.getCubePlannerExpansionRateThreshold());
+        List<Long> recommendCuboidList = algorithm.recommend(
+                kylinConf.getCubePlannerExpansionRateThreshold() / kylinConf.getStorageCompressionRatio());
         logger.info("Cube Planner Algorithm ended at {}", System.currentTimeMillis() - startTime);
 
         if (recommendCuboidList.size() < allCuboidCount) {
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/greedy/GreedyAlgorithm.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/greedy/GreedyAlgorithm.java
index 0a48eea..34b73b9 100755
--- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/greedy/GreedyAlgorithm.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/greedy/GreedyAlgorithm.java
@@ -92,8 +92,8 @@ public class GreedyAlgorithm extends AbstractRecommendAlgorithm {
                     remaining.remove(best.getCuboidId());
                     benefitPolicy.propagateAggregationCost(best.getCuboidId(), selected);
                     round++;
-                    if (logger.isTraceEnabled()) {
-                        logger.trace("Recommend in round {} : {}", round, best);
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("Recommend in round {} : {}", round, best);
                     }
                 } else {
                     doesRemainSpace = false;
@@ -111,15 +111,15 @@ public class GreedyAlgorithm extends AbstractRecommendAlgorithm {
                 "There should be no intersection between excluded list and selected list.");
         logger.info("Greedy Algorithm finished.");
 
-        if (logger.isTraceEnabled()) {
-            logger.trace("Excluded cuboidId size: {}", excluded.size());
-            logger.trace("Excluded cuboidId detail:");
+        if (logger.isDebugEnabled()) {
+            logger.debug("Excluded cuboidId size: {}", excluded.size());
+            logger.debug("Excluded cuboidId detail:");
             for (Long cuboid : excluded) {
-                logger.trace("cuboidId {} and Cost: {} and Space: {}", cuboid,
+                logger.debug("cuboidId {} and Cost: {} and Space: {}", cuboid,
                         cuboidStats.getCuboidQueryCost(cuboid), cuboidStats.getCuboidSize(cuboid));
             }
-            logger.trace("Total Space: {}", spaceLimit - remainingSpace);
-            logger.trace("Space Expansion Rate: {}", (spaceLimit - remainingSpace) / cuboidStats.getBaseCuboidSize());
+            logger.debug("Total Space: {}", spaceLimit - remainingSpace);
+            logger.debug("Space Expansion Rate: {}", (spaceLimit - remainingSpace) / cuboidStats.getBaseCuboidSize());
         }
         return Lists.newArrayList(selected);
     }
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/CubeDimEncMap.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/CubeDimEncMap.java
index cea9db1..7342a69 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/CubeDimEncMap.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/CubeDimEncMap.java
@@ -20,6 +20,7 @@ package org.apache.kylin.cube.kv;
 
 import java.util.Map;
 
+import org.apache.kylin.common.annotation.Clarification;
 import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.model.CubeDesc;
@@ -35,6 +36,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.kylin.shaded.com.google.common.collect.Maps;
 
+@Clarification(deprecated = true, msg = "Useless code in Kylin 4")
 public class CubeDimEncMap implements IDimensionEncodingMap, java.io.Serializable {
 
     private static final Logger logger = LoggerFactory.getLogger(CubeDimEncMap.class);
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
index 393c10d..4558cf0 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
@@ -347,7 +347,10 @@ public class NSparkExecutable extends AbstractExecutable {
     protected String generateSparkCmd(KylinConfig config, String hadoopConf, String jars, String kylinJobJar,
                                       String appArgs) {
         StringBuilder sb = new StringBuilder();
-        sb.append("export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class org.apache.kylin.engine.spark.application.SparkEntry ");
+
+        String sparkSubmitCmd = config.getSparkSubmitCmd() != null ?
+                config.getSparkSubmitCmd() : KylinConfig.getSparkHome() + "/bin/spark-submit";
+        sb.append("export HADOOP_CONF_DIR=%s && %s --class org.apache.kylin.engine.spark.application.SparkEntry ");
 
         Map<String, String> sparkConfs = getSparkConfigOverride(config);
         for (Entry<String, String> entry : sparkConfs.entrySet()) {
@@ -362,7 +365,7 @@ public class NSparkExecutable extends AbstractExecutable {
         sb.append("--files ").append(config.sparkUploadFiles()).append(" ");
         sb.append("--name job_step_%s ");
         sb.append("--jars %s %s %s");
-        String cmd = String.format(Locale.ROOT, sb.toString(), hadoopConf, KylinConfig.getSparkHome(), getId(), jars, kylinJobJar,
+        String cmd = String.format(Locale.ROOT, sb.toString(), hadoopConf, sparkSubmitCmd, getId(), jars, kylinJobJar,
                 appArgs);
         // SparkConf still have a change to be changed in CubeBuildJob.java (Spark Driver)
         logger.info("spark submit cmd: {}", cmd);
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
index 7df8cf0..9504ac8 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
@@ -22,11 +22,13 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
@@ -140,7 +142,9 @@ public class CubeBuildJob extends SparkApplication {
             // 1.2 Save cuboid statistics
             String jobWorkingDirPath = JobBuilderSupport.getJobWorkingDir(cubeInstance.getConfig().getHdfsWorkingDirectory(), jobId);
             Path statisticsDir = new Path(jobWorkingDirPath + "/" + firstSegmentId + "/" + CFG_OUTPUT_STATISTICS);
-            CubeStatsWriter.writeCuboidStatistics(HadoopUtil.getCurrentConfiguration(), statisticsDir, hllMap, 1);
+            Optional<HLLCounter> hll = hllMap.values().stream().max(Comparator.comparingLong(HLLCounter::getCountEstimate));
+            long rc = hll.map(HLLCounter::getCountEstimate).orElse(1L);
+            CubeStatsWriter.writeCuboidStatistics(HadoopUtil.getCurrentConfiguration(), statisticsDir, hllMap, 1, rc);
 
             FileSystem fs = HadoopUtil.getWorkingFileSystem();
             ResourceStore rs = ResourceStore.getStore(config);
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidStatisticsJob.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidStatisticsJob.scala
index c08a6ae..3b963e4 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidStatisticsJob.scala
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidStatisticsJob.scala
@@ -49,41 +49,56 @@ object CuboidStatisticsJob {
 
 class CuboidStatisticsJob(ids: List[Long], rkc: Int) extends Serializable {
   private val info = mutable.Map[String, AggInfo]()
-  val allCuboidsBitSet: Array[Array[Integer]] = getCuboidBitSet(ids, rkc)
+  private var allCuboidsBitSet: Array[Array[Integer]] = Array()
   private val hf: HashFunction = Hashing.murmur3_128
   private val rowHashCodesLong = new Array[Long](rkc)
   private var idx = 0
+  private var meter1 = 0L
+  private var meter2 = 0L
+  private var startMills = 0L
+  private var endMills = 0L
+
 
   def statisticsWithinPartition(rows: Iterator[Row]): Iterator[AggInfo] = {
     init()
+    println("CuboidStatisticsJob-statisticsWithinPartition1-" + System.currentTimeMillis())
     rows.foreach(update)
+    printStat()
+    println("CuboidStatisticsJob-statisticsWithinPartition2-" + System.currentTimeMillis())
     info.valuesIterator
   }
 
   def init(): Unit = {
+    println("CuboidStatisticsJob-Init1-" + System.currentTimeMillis())
+    allCuboidsBitSet = getCuboidBitSet(ids, rkc)
     ids.foreach(i => info.put(i.toString, AggInfo(i.toString)))
+    println("CuboidStatisticsJob-Init2-" + System.currentTimeMillis())
   }
 
   def update(r: Row): Unit = {
     idx += 1
-    if (idx <= 5 || idx % 300 == 0)
+    if (idx <= 5)
       println(r)
     updateCuboid(r)
   }
 
   def updateCuboid(r: Row): Unit = {
     // generate hash for each row key column
-
+    startMills = System.currentTimeMillis()
     var idx = 0
     while (idx < rkc) {
       val hc = hf.newHasher
       var colValue = r.get(idx).toString
       if (colValue == null) colValue = "0"
-      //add column ordinal to the hash value to distinguish between (a,b) and (b,a)
+      // add column ordinal to the hash value to distinguish between (a,b) and (b,a)
       rowHashCodesLong(idx) = hc.putUnencodedChars(colValue).hash().padToLong() + idx
       idx += 1
     }
+    endMills = System.currentTimeMillis()
+    meter1 += (endMills - startMills)
+
 
+    startMills = System.currentTimeMillis()
     // use the row key column hash to get a consolidated hash for each cuboid
     val n = allCuboidsBitSet.length
     idx = 0
@@ -97,6 +112,8 @@ class CuboidStatisticsJob(ids: List[Long], rkc: Int) extends Serializable {
       info(ids(idx).toString).cuboid.counter.addHashDirectly(value)
       idx += 1
     }
+    endMills = System.currentTimeMillis()
+    meter2 += (endMills - startMills)
   }
 
   def getCuboidBitSet(cuboidIds: List[Long], nRowKey: Int): Array[Array[Integer]] = {
@@ -120,6 +137,13 @@ class CuboidStatisticsJob(ids: List[Long], rkc: Int) extends Serializable {
     }
     allCuboidsBitSet
   }
+
+  def printStat(): Unit = {
+    println("    Stats")
+    println("   i   :" + idx)
+    println("meter1 :" + meter1)
+    println("meter2 :" + meter2)
+  }
 }
 
 case class AggInfo(key: String,