You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by zh...@apache.org on 2022/04/19 02:29:22 UTC
[kylin] 02/03: Pre-init KylinCacheFileSystem to fix s3a issue
This is an automated email from the ASF dual-hosted git repository.
zhangzc pushed a commit to branch kylin-soft-affinity-local-cache
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit eddc015628fa1b07105a88421818203a1ee87c6a
Author: Zhichao Zhang <zh...@apache.org>
AuthorDate: Sat Sep 25 23:44:38 2021 +0800
Pre-init KylinCacheFileSystem to fix s3a issue
---
.../org/apache/kylin/common/KylinConfigBase.java | 7 ++++-
.../sql/execution/datasource/FilePruner.scala | 9 +++++++
.../org/apache/spark/sql/SparderContext.scala | 31 +++++++++++-----------
3 files changed, 31 insertions(+), 16 deletions(-)
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 22e2d883e6..811b4bebf7 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -37,6 +37,7 @@ import java.util.regex.Pattern;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.text.StrSubstitutor;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.annotation.ConfigTag;
@@ -298,6 +299,10 @@ public abstract class KylinConfigBase implements Serializable {
}
public String getHdfsWorkingDirectory() {
+ return getHdfsWorkingDirectoryInternal(HadoopUtil.getCurrentConfiguration());
+ }
+
+ public String getHdfsWorkingDirectoryInternal(Configuration hadoopConf) {
if (cachedHdfsWorkingDirectory != null)
return cachedHdfsWorkingDirectory;
@@ -308,7 +313,7 @@ public abstract class KylinConfigBase implements Serializable {
throw new IllegalArgumentException("kylin.env.hdfs-working-dir must be absolute, but got " + root);
try {
- FileSystem fs = path.getFileSystem(HadoopUtil.getCurrentConfiguration());
+ FileSystem fs = path.getFileSystem(hadoopConf);
path = fs.makeQualified(path);
} catch (IOException e) {
throw new RuntimeException(e);
diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
index 0c5cfbff7b..34fc967c29 100644
--- a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
+++ b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
@@ -211,15 +211,20 @@ class FilePruner(cubeInstance: CubeInstance,
var cached = new java.util.HashMap[(Seq[Expression], Seq[Expression]), Seq[PartitionDirectory]]()
private def getFileStatusBySeg(seg: SegmentDirectory, fsc: FileStatusCache): SegmentDirectory = {
+ var startT = System.currentTimeMillis()
val path = new Path(toPath(seg.segmentName, seg.identifier))
val fs = path.getFileSystem(session.sparkContext.hadoopConfiguration)
+ logInfo(s"Get segment filesystem: ${System.currentTimeMillis() - startT}")
+ startT = System.currentTimeMillis()
if (fs.isDirectory(path) && fs.exists(path)) {
val maybeStatuses = fsc.getLeafFiles(path)
if (maybeStatuses.isDefined) {
+ logInfo(s"Get segment status from cache: ${System.currentTimeMillis() - startT}")
SegmentDirectory(seg.segmentName, seg.identifier, maybeStatuses.get)
} else {
val statuses = fs.listStatus(path)
fsc.putLeafFiles(path, statuses)
+ logInfo(s"Get segment status and cache: ${System.currentTimeMillis() - startT}")
SegmentDirectory(seg.segmentName, seg.identifier, statuses)
}
} else {
@@ -239,17 +244,21 @@ class FilePruner(cubeInstance: CubeInstance,
val timePartitionFilters = getSegmentFilter(dataFilters, timePartitionColumn)
logInfo(s"Applying time partition filters: ${timePartitionFilters.mkString(",")}")
+ var startT = System.currentTimeMillis()
val fsc = ShardFileStatusCache.getFileStatusCache(session)
+ logInfo(s"Get file status cache: ${System.currentTimeMillis() - startT}")
// segment pruning
var selected = afterPruning("segment", timePartitionFilters, segmentDirs) {
pruneSegments
}
+ startT = System.currentTimeMillis()
// fetch segment directories info in parallel
selected = selected.par.map(seg => {
getFileStatusBySeg(seg, fsc)
}).filter(_.files.nonEmpty).seq
+ logInfo(s"Get segment status: ${System.currentTimeMillis() - startT}")
// shards pruning
selected = afterPruning("shard", dataFilters, selected) {
diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
index faa921230d..e6b73d66cc 100644
--- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
+++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
@@ -18,30 +18,28 @@
package org.apache.spark.sql
-import java.lang.{Boolean => JBoolean, String => JString}
-import java.nio.file.Paths
-
-import org.apache.spark.memory.MonitorEnv
-import org.apache.spark.util.Utils
-import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
-import org.apache.kylin.query.UdfManager
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.parser.ParseException
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.KylinSession._
-import java.util.concurrent.atomic.AtomicReference
-
import org.apache.commons.io.FileUtils
import org.apache.kylin.common.KylinConfig
import org.apache.kylin.common.util.ToolUtil
+import org.apache.kylin.query.UdfManager
import org.apache.kylin.query.monitor.SparderContextCanary
import org.apache.kylin.spark.classloader.ClassLoaderUtils
import org.apache.spark.deploy.StandaloneAppClient
-import org.apache.spark.sql.SparderContext.master_app_url
-import org.apache.spark.{SparkConf, SparkContext, SparkEnv}
+import org.apache.spark.internal.Logging
+import org.apache.spark.memory.MonitorEnv
+import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
+import org.apache.spark.sql.KylinSession._
+import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasource.{KylinSourceStrategy, ShardFileStatusCache}
import org.apache.spark.sql.metrics.SparderMetricsListener
+import org.apache.spark.util.Utils
import org.apache.spark.utils.YarnInfoFetcherUtils
+import org.apache.spark.{SparkConf, SparkContext, SparkEnv}
+
+import java.lang.{Boolean => JBoolean, String => JString}
+import java.nio.file.Paths
+import java.util.concurrent.atomic.AtomicReference
// scalastyle:off
object SparderContext extends Logging {
@@ -192,6 +190,9 @@ object SparderContext extends Logging {
case _ =>
master_app_url = YarnInfoFetcherUtils.getTrackingUrl(appid)
}
+
+ // pre-init FileSystem, fix s3a issue
+ kylinConf.getHdfsWorkingDirectoryInternal(spark.sparkContext.hadoopConfiguration)
} catch {
case throwable: Throwable =>
logError("Error for initializing spark ", throwable)