You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2021/01/04 06:05:07 UTC
[kylin] 04/06: KYLIN-4818 Performance profile for
CuboidStatisticsJob
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit d8e5db8457bf5bbd1ae05b9360c5ae48038b57fb
Author: XiaoxiangYu <xx...@apache.org>
AuthorDate: Mon Dec 7 21:30:43 2020 +0800
KYLIN-4818 Performance profile for CuboidStatisticsJob
---
.../kylin/engine/mr/common/CubeStatsReader.java | 11 +++++---
.../engine/mr/common/CuboidRecommenderUtil.java | 6 ++--
.../org/apache/kylin/common/KylinConfigBase.java | 20 ++++++++++++--
.../src/main/resources/kylin-defaults.properties | 2 +-
.../cube/cuboid/algorithm/CuboidRecommender.java | 3 +-
.../cuboid/algorithm/greedy/GreedyAlgorithm.java | 16 +++++------
.../org/apache/kylin/cube/kv/CubeDimEncMap.java | 2 ++
.../kylin/engine/spark/job/NSparkExecutable.java | 7 +++--
.../kylin/engine/spark/job/CubeBuildJob.java | 6 +++-
.../engine/spark/job/CuboidStatisticsJob.scala | 32 +++++++++++++++++++---
10 files changed, 80 insertions(+), 25 deletions(-)
diff --git a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
index e63fc1a..1a1dd11 100644
--- a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
+++ b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
@@ -171,7 +171,6 @@ public class CubeStatsReader {
return getCuboidRowCountMapFromSampling(cuboidRowEstimatesHLL, samplingPercentage);
}
- // return map of Cuboid ID => MB
public Map<Long, Double> getCuboidSizeMap() {
return getCuboidSizeMap(false);
}
@@ -218,7 +217,12 @@ public class CubeStatsReader {
final Long baseCuboidRowCount = rowCountMap.get(baseCuboid.getId());
for (int i = 0; i < columnList.size(); i++) {
- rowkeyColumnSize.add(dimEncMap.get(columnList.get(i)).getLengthOfEncoding());
+ /*
+ * A workaround, for the fact kylin do not support self-defined encode in Kylin 4,
+ * it is done by Parquet(https://github.com/apache/parquet-format/blob/master/Encodings.md) for Kylin 4.
+ * It's complex and hard to calculate real size for specific literal value, so I propose to use 4 for a rough estimation.
+ */
+ rowkeyColumnSize.add(4);
}
Map<Long, Double> sizeMap = Maps.newHashMap();
@@ -360,11 +364,10 @@ public class CubeStatsReader {
}
}
- double cuboidSizeRatio = kylinConf.getJobCuboidSizeRatio();
double cuboidSizeMemHungryRatio = kylinConf.getJobCuboidSizeCountDistinctRatio();
double cuboidSizeTopNRatio = kylinConf.getJobCuboidSizeTopNRatio();
- double ret = (1.0 * normalSpace * rowCount * cuboidSizeRatio
+ double ret = (1.0 * normalSpace * rowCount
+ 1.0 * countDistinctSpace * rowCount * cuboidSizeMemHungryRatio + 1.0 * percentileSpace * rowCount
+ 1.0 * topNSpace * rowCount * cuboidSizeTopNRatio) / (1024L * 1024L);
return ret;
diff --git a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java
index 6d9b748..f6ae332 100644
--- a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java
+++ b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java
@@ -60,8 +60,10 @@ public class CuboidRecommenderUtil {
Set<Long> mandatoryCuboids = segment.getCubeDesc().getMandatoryCuboids();
String key = cube.getName();
- CuboidStats cuboidStats = new CuboidStats.Builder(key, baseCuboid, cubeStatsReader.getCuboidRowEstimatesHLL(),
- cubeStatsReader.getCuboidSizeMap()).setMandatoryCuboids(mandatoryCuboids).setBPUSMinBenefitRatio(segment.getConfig().getCubePlannerBPUSMinBenefitRatio()).build();
+ CuboidStats cuboidStats = new CuboidStats.Builder(key, baseCuboid, cubeStatsReader.getCuboidRowEstimatesHLL(), cubeStatsReader.getCuboidSizeMap())
+ .setMandatoryCuboids(mandatoryCuboids)
+ .setBPUSMinBenefitRatio(segment.getConfig().getCubePlannerBPUSMinBenefitRatio())
+ .build();
return CuboidRecommender.getInstance().getRecommendCuboidList(cuboidStats, segment.getConfig(),
!mandatoryCuboids.isEmpty());
}
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index cb0d863..dc91a7f 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -715,11 +715,12 @@ public abstract class KylinConfigBase implements Serializable {
return Boolean.parseBoolean(getOptional("kylin.cube.size-estimate-enable-optimize", "false"));
}
+ @ConfigTag({ConfigTag.Tag.DEPRECATED, ConfigTag.Tag.NOT_CLEAR})
public double getJobCuboidSizeRatio() {
return Double.parseDouble(getOptional("kylin.cube.size-estimate-ratio", "0.25"));
}
- @Deprecated
+ @ConfigTag({ConfigTag.Tag.DEPRECATED, ConfigTag.Tag.NOT_CLEAR})
public double getJobCuboidSizeMemHungryRatio() {
return Double.parseDouble(getOptional("kylin.cube.size-estimate-memhungry-ratio", "0.05"));
}
@@ -801,7 +802,7 @@ public abstract class KylinConfigBase implements Serializable {
// ============================================================================
public boolean isCubePlannerEnabled() {
- return Boolean.parseBoolean(getOptional("kylin.cube.cubeplanner.enabled", TRUE));
+ return Boolean.parseBoolean(getOptional("kylin.cube.cubeplanner.enabled", FALSE));
}
public boolean isCubePlannerEnabledForExistingCube() {
@@ -833,6 +834,14 @@ public abstract class KylinConfigBase implements Serializable {
}
/**
+ * Columnar storage, like apache parquet, often use encode and compression to make data smaller,
+ * and this will affect CuboidRecommendAlgorithm.
+ */
+ public double getStorageCompressionRatio() {
+ return Double.parseDouble(getOptional("kylin.cube.cubeplanner.storage.compression.ratio", "0.2"));
+ }
+
+ /**
* get assigned server array, which a empty string array in default
* @return
*/
@@ -2638,6 +2647,13 @@ public abstract class KylinConfigBase implements Serializable {
return getFileName(kylinHome + File.separator + "lib", PARQUET_JOB_JAR_NAME_PATTERN);
}
+ /**
+ * Use https://github.com/spektom/spark-flamegraph for Spark profile
+ */
+ public String getSparkSubmitCmd() {
+ return getOptional("kylin.engine.spark-cmd", null);
+ }
+
public void overrideKylinParquetJobJarPath(String path) {
logger.info("override {} to {}", KYLIN_ENGINE_PARQUET_JOB_JAR, path);
System.setProperty(KYLIN_ENGINE_PARQUET_JOB_JAR, path);
diff --git a/core-common/src/main/resources/kylin-defaults.properties b/core-common/src/main/resources/kylin-defaults.properties
index e33dbe9..c5cd317 100644
--- a/core-common/src/main/resources/kylin-defaults.properties
+++ b/core-common/src/main/resources/kylin-defaults.properties
@@ -153,7 +153,7 @@ kylin.cube.algorithm.inmem-auto-optimize=true
kylin.cube.aggrgroup.max-combination=32768
-kylin.cube.cubeplanner.enabled=true
+kylin.cube.cubeplanner.enabled=false
kylin.cube.cubeplanner.enabled-for-existing-cube=false
kylin.cube.cubeplanner.expansion-threshold=15.0
kylin.cube.cubeplanner.recommend-cache-max-size=200
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/CuboidRecommender.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/CuboidRecommender.java
index 1f3eaaf..c5f7101 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/CuboidRecommender.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/CuboidRecommender.java
@@ -143,7 +143,8 @@ public class CuboidRecommender {
long startTime = System.currentTimeMillis();
logger.info("Cube Planner Algorithm started at {}", startTime);
- List<Long> recommendCuboidList = algorithm.recommend(kylinConf.getCubePlannerExpansionRateThreshold());
+ List<Long> recommendCuboidList = algorithm.recommend(
+ kylinConf.getCubePlannerExpansionRateThreshold() / kylinConf.getStorageCompressionRatio());
logger.info("Cube Planner Algorithm ended at {}", System.currentTimeMillis() - startTime);
if (recommendCuboidList.size() < allCuboidCount) {
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/greedy/GreedyAlgorithm.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/greedy/GreedyAlgorithm.java
index 0a48eea..34b73b9 100755
--- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/greedy/GreedyAlgorithm.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/greedy/GreedyAlgorithm.java
@@ -92,8 +92,8 @@ public class GreedyAlgorithm extends AbstractRecommendAlgorithm {
remaining.remove(best.getCuboidId());
benefitPolicy.propagateAggregationCost(best.getCuboidId(), selected);
round++;
- if (logger.isTraceEnabled()) {
- logger.trace("Recommend in round {} : {}", round, best);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Recommend in round {} : {}", round, best);
}
} else {
doesRemainSpace = false;
@@ -111,15 +111,15 @@ public class GreedyAlgorithm extends AbstractRecommendAlgorithm {
"There should be no intersection between excluded list and selected list.");
logger.info("Greedy Algorithm finished.");
- if (logger.isTraceEnabled()) {
- logger.trace("Excluded cuboidId size: {}", excluded.size());
- logger.trace("Excluded cuboidId detail:");
+ if (logger.isDebugEnabled()) {
+ logger.debug("Excluded cuboidId size: {}", excluded.size());
+ logger.debug("Excluded cuboidId detail:");
for (Long cuboid : excluded) {
- logger.trace("cuboidId {} and Cost: {} and Space: {}", cuboid,
+ logger.debug("cuboidId {} and Cost: {} and Space: {}", cuboid,
cuboidStats.getCuboidQueryCost(cuboid), cuboidStats.getCuboidSize(cuboid));
}
- logger.trace("Total Space: {}", spaceLimit - remainingSpace);
- logger.trace("Space Expansion Rate: {}", (spaceLimit - remainingSpace) / cuboidStats.getBaseCuboidSize());
+ logger.debug("Total Space: {}", spaceLimit - remainingSpace);
+ logger.debug("Space Expansion Rate: {}", (spaceLimit - remainingSpace) / cuboidStats.getBaseCuboidSize());
}
return Lists.newArrayList(selected);
}
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/CubeDimEncMap.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/CubeDimEncMap.java
index cea9db1..7342a69 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/CubeDimEncMap.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/CubeDimEncMap.java
@@ -20,6 +20,7 @@ package org.apache.kylin.cube.kv;
import java.util.Map;
+import org.apache.kylin.common.annotation.Clarification;
import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.model.CubeDesc;
@@ -35,6 +36,7 @@ import org.slf4j.LoggerFactory;
import org.apache.kylin.shaded.com.google.common.collect.Maps;
+@Clarification(deprecated = true, msg = "Useless code in Kylin 4")
public class CubeDimEncMap implements IDimensionEncodingMap, java.io.Serializable {
private static final Logger logger = LoggerFactory.getLogger(CubeDimEncMap.class);
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
index 393c10d..4558cf0 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
@@ -347,7 +347,10 @@ public class NSparkExecutable extends AbstractExecutable {
protected String generateSparkCmd(KylinConfig config, String hadoopConf, String jars, String kylinJobJar,
String appArgs) {
StringBuilder sb = new StringBuilder();
- sb.append("export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class org.apache.kylin.engine.spark.application.SparkEntry ");
+
+ String sparkSubmitCmd = config.getSparkSubmitCmd() != null ?
+ config.getSparkSubmitCmd() : KylinConfig.getSparkHome() + "/bin/spark-submit";
+ sb.append("export HADOOP_CONF_DIR=%s && %s --class org.apache.kylin.engine.spark.application.SparkEntry ");
Map<String, String> sparkConfs = getSparkConfigOverride(config);
for (Entry<String, String> entry : sparkConfs.entrySet()) {
@@ -362,7 +365,7 @@ public class NSparkExecutable extends AbstractExecutable {
sb.append("--files ").append(config.sparkUploadFiles()).append(" ");
sb.append("--name job_step_%s ");
sb.append("--jars %s %s %s");
- String cmd = String.format(Locale.ROOT, sb.toString(), hadoopConf, KylinConfig.getSparkHome(), getId(), jars, kylinJobJar,
+ String cmd = String.format(Locale.ROOT, sb.toString(), hadoopConf, sparkSubmitCmd, getId(), jars, kylinJobJar,
appArgs);
// SparkConf still have a change to be changed in CubeBuildJob.java (Spark Driver)
logger.info("spark submit cmd: {}", cmd);
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
index 7df8cf0..9504ac8 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
@@ -22,11 +22,13 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
@@ -140,7 +142,9 @@ public class CubeBuildJob extends SparkApplication {
// 1.2 Save cuboid statistics
String jobWorkingDirPath = JobBuilderSupport.getJobWorkingDir(cubeInstance.getConfig().getHdfsWorkingDirectory(), jobId);
Path statisticsDir = new Path(jobWorkingDirPath + "/" + firstSegmentId + "/" + CFG_OUTPUT_STATISTICS);
- CubeStatsWriter.writeCuboidStatistics(HadoopUtil.getCurrentConfiguration(), statisticsDir, hllMap, 1);
+ Optional<HLLCounter> hll = hllMap.values().stream().max(Comparator.comparingLong(HLLCounter::getCountEstimate));
+ long rc = hll.map(HLLCounter::getCountEstimate).orElse(1L);
+ CubeStatsWriter.writeCuboidStatistics(HadoopUtil.getCurrentConfiguration(), statisticsDir, hllMap, 1, rc);
FileSystem fs = HadoopUtil.getWorkingFileSystem();
ResourceStore rs = ResourceStore.getStore(config);
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidStatisticsJob.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidStatisticsJob.scala
index c08a6ae..3b963e4 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidStatisticsJob.scala
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidStatisticsJob.scala
@@ -49,41 +49,56 @@ object CuboidStatisticsJob {
class CuboidStatisticsJob(ids: List[Long], rkc: Int) extends Serializable {
private val info = mutable.Map[String, AggInfo]()
- val allCuboidsBitSet: Array[Array[Integer]] = getCuboidBitSet(ids, rkc)
+ private var allCuboidsBitSet: Array[Array[Integer]] = Array()
private val hf: HashFunction = Hashing.murmur3_128
private val rowHashCodesLong = new Array[Long](rkc)
private var idx = 0
+ private var meter1 = 0L
+ private var meter2 = 0L
+ private var startMills = 0L
+ private var endMills = 0L
+
def statisticsWithinPartition(rows: Iterator[Row]): Iterator[AggInfo] = {
init()
+ println("CuboidStatisticsJob-statisticsWithinPartition1-" + System.currentTimeMillis())
rows.foreach(update)
+ printStat()
+ println("CuboidStatisticsJob-statisticsWithinPartition2-" + System.currentTimeMillis())
info.valuesIterator
}
def init(): Unit = {
+ println("CuboidStatisticsJob-Init1-" + System.currentTimeMillis())
+ allCuboidsBitSet = getCuboidBitSet(ids, rkc)
ids.foreach(i => info.put(i.toString, AggInfo(i.toString)))
+ println("CuboidStatisticsJob-Init2-" + System.currentTimeMillis())
}
def update(r: Row): Unit = {
idx += 1
- if (idx <= 5 || idx % 300 == 0)
+ if (idx <= 5)
println(r)
updateCuboid(r)
}
def updateCuboid(r: Row): Unit = {
// generate hash for each row key column
-
+ startMills = System.currentTimeMillis()
var idx = 0
while (idx < rkc) {
val hc = hf.newHasher
var colValue = r.get(idx).toString
if (colValue == null) colValue = "0"
- //add column ordinal to the hash value to distinguish between (a,b) and (b,a)
+ // add column ordinal to the hash value to distinguish between (a,b) and (b,a)
rowHashCodesLong(idx) = hc.putUnencodedChars(colValue).hash().padToLong() + idx
idx += 1
}
+ endMills = System.currentTimeMillis()
+ meter1 += (endMills - startMills)
+
+ startMills = System.currentTimeMillis()
// use the row key column hash to get a consolidated hash for each cuboid
val n = allCuboidsBitSet.length
idx = 0
@@ -97,6 +112,8 @@ class CuboidStatisticsJob(ids: List[Long], rkc: Int) extends Serializable {
info(ids(idx).toString).cuboid.counter.addHashDirectly(value)
idx += 1
}
+ endMills = System.currentTimeMillis()
+ meter2 += (endMills - startMills)
}
def getCuboidBitSet(cuboidIds: List[Long], nRowKey: Int): Array[Array[Integer]] = {
@@ -120,6 +137,13 @@ class CuboidStatisticsJob(ids: List[Long], rkc: Int) extends Serializable {
}
allCuboidsBitSet
}
+
+ def printStat(): Unit = {
+ println(" Stats")
+ println(" i :" + idx)
+ println("meter1 :" + meter1)
+ println("meter2 :" + meter2)
+ }
}
case class AggInfo(key: String,