You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@kylin.apache.org by GitBox <gi...@apache.org> on 2021/03/30 03:05:04 UTC

[GitHub] [kylin] zhengshengjun commented on a change in pull request #1603: KYLIN-4918 Support Cube Level configuration in FilePruner

zhengshengjun commented on a change in pull request #1603:
URL: https://github.com/apache/kylin/pull/1603#discussion_r603747164



##########
File path: kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
##########
@@ -78,6 +77,8 @@ class FilePruner(cubeInstance: CubeInstance,
                  val options: Map[String, String])
   extends FileIndex with ResetShufflePartition with Logging {
 
+  val MAX_SHARDING_SIZE_PER_TASK: Long = cubeInstance.getConfig.getMaxShardingSizeMBPerTask * 1024 * 1024

Review comment:
       done

##########
File path: kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartition.scala
##########
@@ -23,17 +23,14 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.utils.SparderUtils
 
 trait ResetShufflePartition extends Logging {
-  val PARTITION_SPLIT_BYTES: Long =
-    KylinConfig.getInstanceFromEnv.getQueryPartitionSplitSizeMB * 1024 * 1024 // 64MB
 
-  def setShufflePartitions(bytes: Long, sparkSession: SparkSession): Unit = {
+  def setShufflePartitions(bytes: Long, sparkSession: SparkSession, conf: KylinConfig): Unit = {
     QueryContextFacade.current().addAndGetSourceScanBytes(bytes)
     val defaultParallelism = SparderUtils.getTotalCore(sparkSession.sparkContext.getConf)
-    val kylinConfig = KylinConfig.getInstanceFromEnv
-    val partitionsNum = if (kylinConfig.getSparkSqlShufflePartitions != -1) {
-      kylinConfig.getSparkSqlShufflePartitions
+    val partitionsNum = if (conf.getSparkSqlShufflePartitions != -1) {
+      conf.getSparkSqlShufflePartitions
     } else {
-      Math.min(QueryContextFacade.current().getSourceScanBytes / PARTITION_SPLIT_BYTES + 1,
+      Math.min(QueryContextFacade.current().getSourceScanBytes / (conf.getQueryPartitionSplitSizeMB * 1024 * 1024) + 1,

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org