You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by zh...@apache.org on 2021/04/09 10:31:45 UTC

[kylin] branch kylin-on-parquet-v2 updated: KYLIN-4945 repartition encoded dataset to avoid data skew caused by single column

This is an automated email from the ASF dual-hosted git repository.

zhangzc pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push:
     new dfd012f  KYLIN-4945 repartition encoded dataset to avoid data skew caused by single column
dfd012f is described below

commit dfd012f45b9740ae0598041d7f1326e3a58c0da7
Author: zhengshengjun <sh...@sina.com>
AuthorDate: Wed Mar 24 21:16:58 2021 +0800

    KYLIN-4945 repartition encoded dataset to avoid data skew caused by single column
---
 .../org/apache/kylin/common/KylinConfigBase.java   | 21 +++++++++++++++++
 .../engine/spark/builder/CubeTableEncoder.scala    | 17 ++++++++++++++
 .../kylin/engine/spark/metadata/MetaData.scala     |  1 +
 .../engine/spark/metadata/MetadataConverter.scala  | 27 +++++++++++-----------
 4 files changed, 53 insertions(+), 13 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 34f8ce9..9b17dc1 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -3104,4 +3104,25 @@ public abstract class KylinConfigBase implements Serializable {
     public int getMaxParentDatasetPersistCount() {
         return Integer.parseInt(getOptional("kylin.engine.spark.parent-dataset.max.persist.count", "1"));
     }
+
+    public int getRepartitionNumAfterEncode() {
+        return Integer.valueOf(getOptional("kylin.engine.spark.dataset.repartition.num.after.encoding", "0"));
+    }
+
+    /***
+     * Global dictionary will be split into several buckets. To encode a column to int value more
+     * efficiently, source dataset will be repartitioned by the to-be encoded column to the same
+     * amount of partitions as the dictionary's bucket size.
+     *
+     * It sometimes bring side effect, because repartitioning by a single column is more likely to cause
+     * serious data skew, causing one task takes the majority of time in first layer's cuboid building.
+     *
+     * When faced with this case, you can try repartitioning encoded dataset by all
+     * RowKey columns to avoid data skew. The repartition size is default to max bucket
+     * size of all dictionaries, but you can also set to other flexible value by this option:
+     * 'kylin.engine.spark.dataset.repartition.num.after.encoding'
+     ***/
+    public boolean rePartitionEncodedDatasetWithRowKey() {
+        return Boolean.valueOf(getOptional("kylin.engine.spark.repartition.encoded.dataset", "false"));
+    }
 }
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeTableEncoder.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeTableEncoder.scala
index e4a77c3..1460632 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeTableEncoder.scala
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeTableEncoder.scala
@@ -43,11 +43,15 @@ object CubeTableEncoder extends Logging {
     val bucketThreshold = seg.kylinconf.getGlobalDictV2ThresholdBucketSize
     val minBucketSize: Long = sourceCnt / bucketThreshold
 
+    var repartitionSizeAfterEncode = 0;
     cols.asScala.foreach(
       ref => {
         val globalDict = new NGlobalDictionary(seg.project, ref.tableAliasName, ref.columnName, seg.kylinconf.getHdfsWorkingDirectory)
         val bucketSize = globalDict.getBucketSizeOrDefault(seg.kylinconf.getGlobalDictV2MinHashPartitions)
         val enlargedBucketSize = (((minBucketSize / bucketSize) + 1) * bucketSize).toInt
+        if (enlargedBucketSize > repartitionSizeAfterEncode) {
+          repartitionSizeAfterEncode = enlargedBucketSize;
+        }
 
         val encodeColRef = convertFromDot(ref.identity)
         val columnIndex = structType.fieldIndex(encodeColRef)
@@ -63,7 +67,20 @@ object CubeTableEncoder extends Logging {
           .select(columns: _*)
       }
     )
+
     ds.sparkSession.sparkContext.setJobDescription(null)
+
+    //repartition by a single column during dict encode step before is more easily to cause data skew, add step to void such case.
+    if (!cols.isEmpty && seg.kylinconf.rePartitionEncodedDatasetWithRowKey) {
+      val colsInDS = partitionedDs.schema.map(_.name)
+      val rowKeyColRefs = seg.allRowKeyCols.map(colDesc => convertFromDot(colDesc.identity)).filter(colsInDS.contains).map(col)
+      //if not set in config, use the largest partition num during dict encode step
+      if (seg.kylinconf.getRepartitionNumAfterEncode > 0) {
+        repartitionSizeAfterEncode = seg.kylinconf.getRepartitionNumAfterEncode;
+      }
+      logInfo(s"repartition encoded dataset to $repartitionSizeAfterEncode partitions to avoid data skew")
+      partitionedDs = partitionedDs.repartition(repartitionSizeAfterEncode, rowKeyColRefs.toArray: _*)
+    }
     partitionedDs
   }
 }
\ No newline at end of file
diff --git a/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetaData.scala b/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetaData.scala
index e81aa8e..030834b 100644
--- a/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetaData.scala
+++ b/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetaData.scala
@@ -111,6 +111,7 @@ case class SegmentInfo(id: String,
                        allDictColumns: Set[ColumnDesc],
                        partitionExp: String,
                        filterCondition: String,
+                       allRowKeyCols: List[ColumnDesc],
                        var snapshotInfo: Map[String, String] = Map.empty[String, String]) {
 
   def updateLayout(layoutEntity: LayoutEntity): Unit = {
diff --git a/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetadataConverter.scala b/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetadataConverter.scala
index 0d12c25..a50a835 100644
--- a/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetadataConverter.scala
+++ b/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetadataConverter.scala
@@ -37,7 +37,7 @@ import scala.collection.mutable
 
 object MetadataConverter {
   def getSegmentInfo(cubeInstance: CubeInstance, segmentId: String, segmentName: String, identifier: String): SegmentInfo = {
-    val allColumnDesc = extractAllColumnDesc(cubeInstance)
+    val (allColumnDesc, allRowKeyCols) = extractAllColumnDesc(cubeInstance)
     val (layoutEntities, measure) = extractEntityAndMeasures(cubeInstance)
     val dictColumn = measure.values.filter(_.returnType.dataType.equals("bitmap"))
       .map(_.pra.head).toSet
@@ -47,7 +47,8 @@ object MetadataConverter {
       dictColumn,
       dictColumn,
       extractPartitionExp(cubeInstance.getSegmentById(segmentId)),
-      extractFilterCondition(cubeInstance.getSegmentById(segmentId)))
+      extractFilterCondition(cubeInstance.getSegmentById(segmentId)),
+      allRowKeyCols.asScala.values.toList)
   }
 
   def getCubeUpdate(segmentInfo: SegmentInfo): CubeUpdate = {
@@ -94,25 +95,25 @@ object MetadataConverter {
       tb.getColumns.asScala.map(ref => toColumnDesc(ref = ref)).toList, tb.getAlias, tb.getTableDesc.getSourceType, addInfo)
   }
 
-  def extractAllColumnDesc(cubeInstance: CubeInstance): java.util.LinkedHashMap[Integer, ColumnDesc] = {
-    val dimensionIndex = new util.LinkedHashMap[Integer, ColumnDesc]()
+  def extractAllColumnDesc(cubeInstance: CubeInstance): (java.util.LinkedHashMap[Integer, ColumnDesc],
+    java.util.LinkedHashMap[Integer, ColumnDesc]) = {
+    //use LinkedHashMap to keep RowKey column the same order as its bit index
+    val dimensions = new util.LinkedHashMap[Integer, ColumnDesc]()
     val columns = cubeInstance.getDescriptor
       .getRowkey
       .getRowKeyColumns
     val dimensionMapping = columns
       .map(co => (co.getColRef, co.getBitIndex))
     val set = dimensionMapping.map(_._1).toSet
-    val refs = cubeInstance.getAllColumns.asScala.diff(set)
+    val measureCols = cubeInstance.getAllColumns.asScala.diff(set)
       .zipWithIndex
       .map(tp => (tp._1, tp._2 + dimensionMapping.length))
-
-    val columnIDTuples = dimensionMapping ++ refs
-    val colToIndex = columnIDTuples.toMap
-    columnIDTuples
-      .foreach { co =>
-        dimensionIndex.put(co._2, toColumnDesc(co._1, co._2, set.contains(co._1)))
-      }
-    dimensionIndex
+    dimensionMapping.foreach(co => dimensions.put(co._2, toColumnDesc(co._1, co._2, true)))
+    val allColumns = new util.LinkedHashMap[Integer, ColumnDesc]()
+    //keep RowKey columns before measure columns in LinkedHashMap
+    allColumns.putAll(dimensions)
+    measureCols.foreach(co =>  allColumns.put(co._2, toColumnDesc(co._1, co._2, false)))
+    (allColumns, dimensions)
   }
 
   def toLayoutEntity(cubeInstance: CubeInstance, cuboid: Cuboid): LayoutEntity = {