You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2023/01/12 09:28:00 UTC

[kylin] 04/17: [DIRTY] fix partition snapshot build on spark serverless

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit c14359ccd3b55704f4168b70cc9e8df89732b751
Author: xingjian <xi...@kyligence.io>
AuthorDate: Thu Nov 10 10:08:27 2022 +0800

    [DIRTY] fix partition snapshot build on spark serverless
---
 .../engine/spark/builder/SnapshotBuilder.scala     | 31 ++++++++++++----------
 1 file changed, 17 insertions(+), 14 deletions(-)

diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SnapshotBuilder.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SnapshotBuilder.scala
index ce5a080601..b077112498 100644
--- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SnapshotBuilder.scala
+++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SnapshotBuilder.scala
@@ -18,29 +18,28 @@
 
 package org.apache.kylin.engine.spark.builder
 
-import java.io.IOException
-import java.util
-import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap, Executors}
-import java.util.{Objects, UUID}
 import com.google.common.collect.Maps
-import org.apache.kylin.engine.spark.NSparkCubingEngine
-import org.apache.kylin.engine.spark.job.{DFChooser, KylinBuildEnv}
-import org.apache.kylin.engine.spark.utils.{FileNames, LogUtils}
-import org.apache.kylin.metadata.model.{NDataModel, NTableMetadataManager}
-import org.apache.kylin.metadata.project.NProjectManager
 import org.apache.commons.codec.digest.DigestUtils
 import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
 import org.apache.kylin.common.KylinConfig.SetAndUnsetThreadLocalConfig
 import org.apache.kylin.common.persistence.transaction.UnitOfWork
-import org.apache.kylin.common.{KapConfig, KylinConfig}
 import org.apache.kylin.common.util.HadoopUtil
-import org.apache.kylin.metadata.model.{TableDesc, TableExtDesc}
+import org.apache.kylin.common.{KapConfig, KylinConfig}
+import org.apache.kylin.engine.spark.NSparkCubingEngine
+import org.apache.kylin.engine.spark.job.{DFChooser, KylinBuildEnv}
+import org.apache.kylin.engine.spark.utils.{FileNames, LogUtils}
+import org.apache.kylin.metadata.model.{NDataModel, NTableMetadataManager, TableDesc, TableExtDesc}
+import org.apache.kylin.metadata.project.NProjectManager
 import org.apache.kylin.source.SourceFactory
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.hive.utils.ResourceDetectUtils
-import org.apache.spark.sql.{Dataset, Encoders, Row, SparderEnv, SparkSession}
+import org.apache.spark.sql._
 import org.apache.spark.utils.ProxyThreadUtils
 
+import java.io.IOException
+import java.util
+import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap, Executors}
+import java.util.{Objects, UUID}
 import scala.collection.JavaConverters._
 import scala.concurrent.duration._
 import scala.concurrent.{ExecutionContext, Future}
@@ -384,7 +383,7 @@ class SnapshotBuilder(var jobId: String) extends Logging with Serializable {
     val resourcePath = baseDir + "/" + snapshotTablePath
     var hadoopConf = SparderEnv.getHadoopConfiguration()
     if (kylinConfig.getClusterManagerClassName.contains("AWSServerless")) {
-        hadoopConf = ss.sparkContext.hadoopConfiguration
+      hadoopConf = ss.sparkContext.hadoopConfiguration
     }
     val (repartitionNum, sizeMB) = try {
       val sizeInMB = ResourceDetectUtils.getPaths(sourceData.queryExecution.sparkPlan)
@@ -464,8 +463,12 @@ class SnapshotBuilder(var jobId: String) extends Logging with Serializable {
       hadoopConf = sourceData.sparkSession.sparkContext.hadoopConfiguration
     }
     try {
+      var hadoopConf = SparderEnv.getHadoopConfiguration()
+      if (kylinConfig.getClusterManagerClassName.contains("AWSServerless")) {
+        hadoopConf = sourceData.sparkSession.sparkContext.hadoopConfiguration
+      }
       val sizeInMB = ResourceDetectUtils.getPaths(sourceData.queryExecution.sparkPlan)
-        .map(path => HadoopUtil.getContentSummary(path.getFileSystem(SparderEnv.getHadoopConfiguration()), path).getLength)
+        .map(path => HadoopUtil.getContentSummary(path.getFileSystem(hadoopConf), path).getLength)
         .sum * 1.0 / MB
       val num = Math.ceil(sizeInMB / KylinBuildEnv.get().kylinConfig.getSnapshotShardSizeMB).intValue()
       (num, sizeInMB)