You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by zh...@apache.org on 2022/04/19 02:29:22 UTC

[kylin] 02/03: Pre-init KylinCacheFileSystem to fix s3a issue

This is an automated email from the ASF dual-hosted git repository.

zhangzc pushed a commit to branch kylin-soft-affinity-local-cache
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit eddc015628fa1b07105a88421818203a1ee87c6a
Author: Zhichao Zhang <zh...@apache.org>
AuthorDate: Sat Sep 25 23:44:38 2021 +0800

    Pre-init KylinCacheFileSystem to fix s3a issue
---
 .../org/apache/kylin/common/KylinConfigBase.java   |  7 ++++-
 .../sql/execution/datasource/FilePruner.scala      |  9 +++++++
 .../org/apache/spark/sql/SparderContext.scala      | 31 +++++++++++-----------
 3 files changed, 31 insertions(+), 16 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 22e2d883e6..811b4bebf7 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -37,6 +37,7 @@ import java.util.regex.Pattern;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.text.StrSubstitutor;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.annotation.ConfigTag;
@@ -298,6 +299,10 @@ public abstract class KylinConfigBase implements Serializable {
     }
 
     public String getHdfsWorkingDirectory() {
+        return getHdfsWorkingDirectoryInternal(HadoopUtil.getCurrentConfiguration());
+    }
+
+    public String getHdfsWorkingDirectoryInternal(Configuration hadoopConf) {
         if (cachedHdfsWorkingDirectory != null)
             return cachedHdfsWorkingDirectory;
 
@@ -308,7 +313,7 @@ public abstract class KylinConfigBase implements Serializable {
             throw new IllegalArgumentException("kylin.env.hdfs-working-dir must be absolute, but got " + root);
 
         try {
-            FileSystem fs = path.getFileSystem(HadoopUtil.getCurrentConfiguration());
+            FileSystem fs = path.getFileSystem(hadoopConf);
             path = fs.makeQualified(path);
         } catch (IOException e) {
             throw new RuntimeException(e);
diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
index 0c5cfbff7b..34fc967c29 100644
--- a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
+++ b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
@@ -211,15 +211,20 @@ class FilePruner(cubeInstance: CubeInstance,
   var cached = new java.util.HashMap[(Seq[Expression], Seq[Expression]), Seq[PartitionDirectory]]()
 
   private def getFileStatusBySeg(seg: SegmentDirectory, fsc: FileStatusCache): SegmentDirectory = {
+    var startT = System.currentTimeMillis()
     val path = new Path(toPath(seg.segmentName, seg.identifier))
     val fs = path.getFileSystem(session.sparkContext.hadoopConfiguration)
+    logInfo(s"Get segment filesystem: ${System.currentTimeMillis() - startT}")
+    startT = System.currentTimeMillis()
     if (fs.isDirectory(path) && fs.exists(path)) {
       val maybeStatuses = fsc.getLeafFiles(path)
       if (maybeStatuses.isDefined) {
+        logInfo(s"Get segment status from cache: ${System.currentTimeMillis() - startT}")
         SegmentDirectory(seg.segmentName, seg.identifier, maybeStatuses.get)
       } else {
         val statuses = fs.listStatus(path)
         fsc.putLeafFiles(path, statuses)
+        logInfo(s"Get segment status and cache: ${System.currentTimeMillis() - startT}")
         SegmentDirectory(seg.segmentName, seg.identifier, statuses)
       }
     } else {
@@ -239,17 +244,21 @@ class FilePruner(cubeInstance: CubeInstance,
     val timePartitionFilters = getSegmentFilter(dataFilters, timePartitionColumn)
     logInfo(s"Applying time partition filters: ${timePartitionFilters.mkString(",")}")
 
+    var startT = System.currentTimeMillis()
     val fsc = ShardFileStatusCache.getFileStatusCache(session)
+    logInfo(s"Get file status cache: ${System.currentTimeMillis() - startT}")
 
     // segment pruning
     var selected = afterPruning("segment", timePartitionFilters, segmentDirs) {
       pruneSegments
     }
 
+    startT = System.currentTimeMillis()
     // fetch segment directories info in parallel
     selected = selected.par.map(seg => {
       getFileStatusBySeg(seg, fsc)
     }).filter(_.files.nonEmpty).seq
+    logInfo(s"Get segment status: ${System.currentTimeMillis() - startT}")
 
     // shards pruning
     selected = afterPruning("shard", dataFilters, selected) {
diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
index faa921230d..e6b73d66cc 100644
--- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
+++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
@@ -18,30 +18,28 @@
 
 package org.apache.spark.sql
 
-import java.lang.{Boolean => JBoolean, String => JString}
-import java.nio.file.Paths
-
-import org.apache.spark.memory.MonitorEnv
-import org.apache.spark.util.Utils
-import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
-import org.apache.kylin.query.UdfManager
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.parser.ParseException
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.KylinSession._
-import java.util.concurrent.atomic.AtomicReference
-
 import org.apache.commons.io.FileUtils
 import org.apache.kylin.common.KylinConfig
 import org.apache.kylin.common.util.ToolUtil
+import org.apache.kylin.query.UdfManager
 import org.apache.kylin.query.monitor.SparderContextCanary
 import org.apache.kylin.spark.classloader.ClassLoaderUtils
 import org.apache.spark.deploy.StandaloneAppClient
-import org.apache.spark.sql.SparderContext.master_app_url
-import org.apache.spark.{SparkConf, SparkContext, SparkEnv}
+import org.apache.spark.internal.Logging
+import org.apache.spark.memory.MonitorEnv
+import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
+import org.apache.spark.sql.KylinSession._
+import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.datasource.{KylinSourceStrategy, ShardFileStatusCache}
 import org.apache.spark.sql.metrics.SparderMetricsListener
+import org.apache.spark.util.Utils
 import org.apache.spark.utils.YarnInfoFetcherUtils
+import org.apache.spark.{SparkConf, SparkContext, SparkEnv}
+
+import java.lang.{Boolean => JBoolean, String => JString}
+import java.nio.file.Paths
+import java.util.concurrent.atomic.AtomicReference
 
 // scalastyle:off
 object SparderContext extends Logging {
@@ -192,6 +190,9 @@ object SparderContext extends Logging {
                 case _ =>
                   master_app_url = YarnInfoFetcherUtils.getTrackingUrl(appid)
               }
+
+              // pre-init FileSystem, fix s3a issue
+              kylinConf.getHdfsWorkingDirectoryInternal(spark.sparkContext.hadoopConfiguration)
             } catch {
               case throwable: Throwable =>
                 logError("Error for initializing spark ", throwable)