You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by zh...@apache.org on 2021/04/09 10:31:45 UTC
[kylin] branch kylin-on-parquet-v2 updated: KYLIN-4945 repartition
encoded dataset to avoid data skew caused by single column
This is an automated email from the ASF dual-hosted git repository.
zhangzc pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push:
new dfd012f KYLIN-4945 repartition encoded dataset to avoid data skew caused by single column
dfd012f is described below
commit dfd012f45b9740ae0598041d7f1326e3a58c0da7
Author: zhengshengjun <sh...@sina.com>
AuthorDate: Wed Mar 24 21:16:58 2021 +0800
KYLIN-4945 repartition encoded dataset to avoid data skew caused by single column
---
.../org/apache/kylin/common/KylinConfigBase.java | 21 +++++++++++++++++
.../engine/spark/builder/CubeTableEncoder.scala | 17 ++++++++++++++
.../kylin/engine/spark/metadata/MetaData.scala | 1 +
.../engine/spark/metadata/MetadataConverter.scala | 27 +++++++++++-----------
4 files changed, 53 insertions(+), 13 deletions(-)
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 34f8ce9..9b17dc1 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -3104,4 +3104,25 @@ public abstract class KylinConfigBase implements Serializable {
public int getMaxParentDatasetPersistCount() {
return Integer.parseInt(getOptional("kylin.engine.spark.parent-dataset.max.persist.count", "1"));
}
+
+ public int getRepartitionNumAfterEncode() {
+ return Integer.valueOf(getOptional("kylin.engine.spark.dataset.repartition.num.after.encoding", "0"));
+ }
+
+ /***
+ * Global dictionary will be split into several buckets. To encode a column to int value more
+ * efficiently, source dataset will be repartitioned by the to-be encoded column to the same
+ * amount of partitions as the dictionary's bucket size.
+ *
+ * It sometimes bring side effect, because repartitioning by a single column is more likely to cause
+ * serious data skew, causing one task takes the majority of time in first layer's cuboid building.
+ *
+ * When faced with this case, you can try repartitioning encoded dataset by all
+ * RowKey columns to avoid data skew. The repartition size is default to max bucket
+ * size of all dictionaries, but you can also set to other flexible value by this option:
+ * 'kylin.engine.spark.dataset.repartition.num.after.encoding'
+ ***/
+ public boolean rePartitionEncodedDatasetWithRowKey() {
+ return Boolean.valueOf(getOptional("kylin.engine.spark.repartition.encoded.dataset", "false"));
+ }
}
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeTableEncoder.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeTableEncoder.scala
index e4a77c3..1460632 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeTableEncoder.scala
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeTableEncoder.scala
@@ -43,11 +43,15 @@ object CubeTableEncoder extends Logging {
val bucketThreshold = seg.kylinconf.getGlobalDictV2ThresholdBucketSize
val minBucketSize: Long = sourceCnt / bucketThreshold
+ var repartitionSizeAfterEncode = 0;
cols.asScala.foreach(
ref => {
val globalDict = new NGlobalDictionary(seg.project, ref.tableAliasName, ref.columnName, seg.kylinconf.getHdfsWorkingDirectory)
val bucketSize = globalDict.getBucketSizeOrDefault(seg.kylinconf.getGlobalDictV2MinHashPartitions)
val enlargedBucketSize = (((minBucketSize / bucketSize) + 1) * bucketSize).toInt
+ if (enlargedBucketSize > repartitionSizeAfterEncode) {
+ repartitionSizeAfterEncode = enlargedBucketSize;
+ }
val encodeColRef = convertFromDot(ref.identity)
val columnIndex = structType.fieldIndex(encodeColRef)
@@ -63,7 +67,20 @@ object CubeTableEncoder extends Logging {
.select(columns: _*)
}
)
+
ds.sparkSession.sparkContext.setJobDescription(null)
+
+ //repartition by a single column during dict encode step before is more easily to cause data skew, add step to void such case.
+ if (!cols.isEmpty && seg.kylinconf.rePartitionEncodedDatasetWithRowKey) {
+ val colsInDS = partitionedDs.schema.map(_.name)
+ val rowKeyColRefs = seg.allRowKeyCols.map(colDesc => convertFromDot(colDesc.identity)).filter(colsInDS.contains).map(col)
+ //if not set in config, use the largest partition num during dict encode step
+ if (seg.kylinconf.getRepartitionNumAfterEncode > 0) {
+ repartitionSizeAfterEncode = seg.kylinconf.getRepartitionNumAfterEncode;
+ }
+ logInfo(s"repartition encoded dataset to $repartitionSizeAfterEncode partitions to avoid data skew")
+ partitionedDs = partitionedDs.repartition(repartitionSizeAfterEncode, rowKeyColRefs.toArray: _*)
+ }
partitionedDs
}
}
\ No newline at end of file
diff --git a/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetaData.scala b/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetaData.scala
index e81aa8e..030834b 100644
--- a/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetaData.scala
+++ b/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetaData.scala
@@ -111,6 +111,7 @@ case class SegmentInfo(id: String,
allDictColumns: Set[ColumnDesc],
partitionExp: String,
filterCondition: String,
+ allRowKeyCols: List[ColumnDesc],
var snapshotInfo: Map[String, String] = Map.empty[String, String]) {
def updateLayout(layoutEntity: LayoutEntity): Unit = {
diff --git a/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetadataConverter.scala b/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetadataConverter.scala
index 0d12c25..a50a835 100644
--- a/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetadataConverter.scala
+++ b/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetadataConverter.scala
@@ -37,7 +37,7 @@ import scala.collection.mutable
object MetadataConverter {
def getSegmentInfo(cubeInstance: CubeInstance, segmentId: String, segmentName: String, identifier: String): SegmentInfo = {
- val allColumnDesc = extractAllColumnDesc(cubeInstance)
+ val (allColumnDesc, allRowKeyCols) = extractAllColumnDesc(cubeInstance)
val (layoutEntities, measure) = extractEntityAndMeasures(cubeInstance)
val dictColumn = measure.values.filter(_.returnType.dataType.equals("bitmap"))
.map(_.pra.head).toSet
@@ -47,7 +47,8 @@ object MetadataConverter {
dictColumn,
dictColumn,
extractPartitionExp(cubeInstance.getSegmentById(segmentId)),
- extractFilterCondition(cubeInstance.getSegmentById(segmentId)))
+ extractFilterCondition(cubeInstance.getSegmentById(segmentId)),
+ allRowKeyCols.asScala.values.toList)
}
def getCubeUpdate(segmentInfo: SegmentInfo): CubeUpdate = {
@@ -94,25 +95,25 @@ object MetadataConverter {
tb.getColumns.asScala.map(ref => toColumnDesc(ref = ref)).toList, tb.getAlias, tb.getTableDesc.getSourceType, addInfo)
}
- def extractAllColumnDesc(cubeInstance: CubeInstance): java.util.LinkedHashMap[Integer, ColumnDesc] = {
- val dimensionIndex = new util.LinkedHashMap[Integer, ColumnDesc]()
+ def extractAllColumnDesc(cubeInstance: CubeInstance): (java.util.LinkedHashMap[Integer, ColumnDesc],
+ java.util.LinkedHashMap[Integer, ColumnDesc]) = {
+ //use LinkedHashMap to keep RowKey column the same order as its bit index
+ val dimensions = new util.LinkedHashMap[Integer, ColumnDesc]()
val columns = cubeInstance.getDescriptor
.getRowkey
.getRowKeyColumns
val dimensionMapping = columns
.map(co => (co.getColRef, co.getBitIndex))
val set = dimensionMapping.map(_._1).toSet
- val refs = cubeInstance.getAllColumns.asScala.diff(set)
+ val measureCols = cubeInstance.getAllColumns.asScala.diff(set)
.zipWithIndex
.map(tp => (tp._1, tp._2 + dimensionMapping.length))
-
- val columnIDTuples = dimensionMapping ++ refs
- val colToIndex = columnIDTuples.toMap
- columnIDTuples
- .foreach { co =>
- dimensionIndex.put(co._2, toColumnDesc(co._1, co._2, set.contains(co._1)))
- }
- dimensionIndex
+ dimensionMapping.foreach(co => dimensions.put(co._2, toColumnDesc(co._1, co._2, true)))
+ val allColumns = new util.LinkedHashMap[Integer, ColumnDesc]()
+ //keep RowKey columns before measure columns in LinkedHashMap
+ allColumns.putAll(dimensions)
+ measureCols.foreach(co => allColumns.put(co._2, toColumnDesc(co._1, co._2, false)))
+ (allColumns, dimensions)
}
def toLayoutEntity(cubeInstance: CubeInstance, cuboid: Cuboid): LayoutEntity = {