You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2023/01/12 09:28:00 UTC
[kylin] 04/17: [DIRTY] fix partition snapshot build on spark serverless
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit c14359ccd3b55704f4168b70cc9e8df89732b751
Author: xingjian <xi...@kyligence.io>
AuthorDate: Thu Nov 10 10:08:27 2022 +0800
[DIRTY] fix partition snapshot build on spark serverless
---
.../engine/spark/builder/SnapshotBuilder.scala | 31 ++++++++++++----------
1 file changed, 17 insertions(+), 14 deletions(-)
diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SnapshotBuilder.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SnapshotBuilder.scala
index ce5a080601..b077112498 100644
--- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SnapshotBuilder.scala
+++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SnapshotBuilder.scala
@@ -18,29 +18,28 @@
package org.apache.kylin.engine.spark.builder
-import java.io.IOException
-import java.util
-import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap, Executors}
-import java.util.{Objects, UUID}
import com.google.common.collect.Maps
-import org.apache.kylin.engine.spark.NSparkCubingEngine
-import org.apache.kylin.engine.spark.job.{DFChooser, KylinBuildEnv}
-import org.apache.kylin.engine.spark.utils.{FileNames, LogUtils}
-import org.apache.kylin.metadata.model.{NDataModel, NTableMetadataManager}
-import org.apache.kylin.metadata.project.NProjectManager
import org.apache.commons.codec.digest.DigestUtils
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
import org.apache.kylin.common.KylinConfig.SetAndUnsetThreadLocalConfig
import org.apache.kylin.common.persistence.transaction.UnitOfWork
-import org.apache.kylin.common.{KapConfig, KylinConfig}
import org.apache.kylin.common.util.HadoopUtil
-import org.apache.kylin.metadata.model.{TableDesc, TableExtDesc}
+import org.apache.kylin.common.{KapConfig, KylinConfig}
+import org.apache.kylin.engine.spark.NSparkCubingEngine
+import org.apache.kylin.engine.spark.job.{DFChooser, KylinBuildEnv}
+import org.apache.kylin.engine.spark.utils.{FileNames, LogUtils}
+import org.apache.kylin.metadata.model.{NDataModel, NTableMetadataManager, TableDesc, TableExtDesc}
+import org.apache.kylin.metadata.project.NProjectManager
import org.apache.kylin.source.SourceFactory
import org.apache.spark.internal.Logging
import org.apache.spark.sql.hive.utils.ResourceDetectUtils
-import org.apache.spark.sql.{Dataset, Encoders, Row, SparderEnv, SparkSession}
+import org.apache.spark.sql._
import org.apache.spark.utils.ProxyThreadUtils
+import java.io.IOException
+import java.util
+import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap, Executors}
+import java.util.{Objects, UUID}
import scala.collection.JavaConverters._
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
@@ -384,7 +383,7 @@ class SnapshotBuilder(var jobId: String) extends Logging with Serializable {
val resourcePath = baseDir + "/" + snapshotTablePath
var hadoopConf = SparderEnv.getHadoopConfiguration()
if (kylinConfig.getClusterManagerClassName.contains("AWSServerless")) {
- hadoopConf = ss.sparkContext.hadoopConfiguration
+ hadoopConf = ss.sparkContext.hadoopConfiguration
}
val (repartitionNum, sizeMB) = try {
val sizeInMB = ResourceDetectUtils.getPaths(sourceData.queryExecution.sparkPlan)
@@ -464,8 +463,12 @@ class SnapshotBuilder(var jobId: String) extends Logging with Serializable {
hadoopConf = sourceData.sparkSession.sparkContext.hadoopConfiguration
}
try {
+ var hadoopConf = SparderEnv.getHadoopConfiguration()
+ if (kylinConfig.getClusterManagerClassName.contains("AWSServerless")) {
+ hadoopConf = sourceData.sparkSession.sparkContext.hadoopConfiguration
+ }
val sizeInMB = ResourceDetectUtils.getPaths(sourceData.queryExecution.sparkPlan)
- .map(path => HadoopUtil.getContentSummary(path.getFileSystem(SparderEnv.getHadoopConfiguration()), path).getLength)
+ .map(path => HadoopUtil.getContentSummary(path.getFileSystem(hadoopConf), path).getLength)
.sum * 1.0 / MB
val num = Math.ceil(sizeInMB / KylinBuildEnv.get().kylinConfig.getSnapshotShardSizeMB).intValue()
(num, sizeInMB)