You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2021/02/07 14:24:00 UTC

[kylin] branch kylin-on-parquet-v2 updated: KYLIN-4892 Reduce the times of fetching files status from HDFS Namenode in FilePruner

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push:
     new b9cd81a  KYLIN-4892 Reduce the times of fetching files status from HDFS Namenode in FilePruner
b9cd81a is described below

commit b9cd81a0b1710913af469b04d44f3451d6a87f0c
Author: Zhichao Zhang <44...@qq.com>
AuthorDate: Mon Feb 1 16:57:02 2021 +0800

    KYLIN-4892 Reduce the times of fetching files status from HDFS Namenode in FilePruner
    
    (cherry picked from commit 6e4d94d1c027d5877eb3013f37ef223aa0532cc2)
    (cherry picked from commit edebb98ca33e1f3ddf000842f12d0bff45109c57)
---
 .../sql/execution/datasource/FilePruner.scala      | 109 +++++++++++----------
 .../org/apache/spark/sql/SparderContext.scala      |   5 +-
 .../engine/spark2/NBadQueryAndPushDownTest.java    |   2 +-
 3 files changed, 62 insertions(+), 54 deletions(-)

diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
index 9f62f52..f0f7916 100644
--- a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
+++ b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
@@ -51,10 +51,9 @@ case class SegmentDirectory(segmentName: String, identifier: String, files: Seq[
  * @param shardColumnNames the names of the columns that used to generate the shard id.
  * @param sortColumnNames  the names of the columns that used to sort data in each shard.
  */
-case class ShardSpec(
-                      numShards: Int,
-                      shardColumnNames: Seq[String],
-                      sortColumnNames: Seq[String]) {
+case class ShardSpec(numShards: Int,
+                     shardColumnNames: Seq[String],
+                     sortColumnNames: Seq[String]) {
 
   if (numShards <= 0) {
     throw new AnalysisException(
@@ -72,28 +71,17 @@ case class ShardSpec(
   }
 }
 
-class FilePruner(
-                  cubeInstance: CubeInstance,
-                  cuboid: Cuboid,
-                  val session: SparkSession,
-                  val options: Map[String, String])
+class FilePruner(cubeInstance: CubeInstance,
+                 cuboid: Cuboid,
+                 val session: SparkSession,
+                 val options: Map[String, String])
   extends FileIndex with ResetShufflePartition with Logging {
 
   private lazy val segmentDirs: Seq[SegmentDirectory] = {
     cubeInstance.getSegments.asScala
       .filter(_.getStatus.equals(SegmentStatusEnum.READY)).map(seg => {
-      val segName = seg.getName
-      val path = PathManager.getParquetStoragePath(cubeInstance, segName, seg.getStorageLocationIdentifier, layoutEntity.getId)
-      val files = new InMemoryFileIndex(session,
-        Seq(new Path(path)),
-        options,
-        Some(dataSchema),
-        FileStatusCache.getOrCreate(session))
-        .listFiles(Nil, Nil)
-        .flatMap(_.files)
-        .filter(_.isFile)
-      SegmentDirectory(segName, seg.getStorageLocationIdentifier, files)
-    }).filter(_.files.nonEmpty)
+      SegmentDirectory(seg.getName, seg.getStorageLocationIdentifier, Nil)
+    })
   }
 
   val layoutEntity = MetadataConverter.toLayoutEntity(cubeInstance, cuboid)
@@ -103,7 +91,8 @@ class FilePruner(
       .map { column => StructField(column.id.toString, column.dataType) }
       .toSeq ++
       layoutEntity.getOrderedMeasures.asScala
-        .map { entry => StructField(entry._1.toString, SparkTypeUtil.generateFunctionReturnDataType(entry._2)) }
+        .map { entry =>
+          StructField(entry._1.toString, SparkTypeUtil.generateFunctionReturnDataType(entry._2)) }
         .toSeq)
   }
 
@@ -115,7 +104,6 @@ class FilePruner(
     PathManager.getParquetStoragePath(cubeInstance, segmentName, identifier, cuboid.getId)
   }
 
-
   override lazy val partitionSchema: StructType = {
     // we did not use the partitionBy mechanism of spark
     new StructType()
@@ -130,7 +118,8 @@ class FilePruner(
         val ref = desc.getPartitionDateColumnRef
         // only consider partition date column
         // we can only get col ID in layout cuz data schema is all ids.
-        val id = layoutEntity.getOrderedDimensions.asScala.values.find(column => column.columnName.equals(ref.getName))
+        val id = layoutEntity.getOrderedDimensions.asScala.values.find(
+          column => column.columnName.equals(ref.getName))
         if (id.isDefined && (ref.getType.isDateTimeFamily || ref.getType.isStringFamily)) {
           pattern = desc.getPartitionDateFormat
           dataSchema.filter(_.name == String.valueOf(id.get.id))
@@ -208,12 +197,32 @@ class FilePruner(
 
   var cached = new java.util.HashMap[(Seq[Expression], Seq[Expression]), Seq[PartitionDirectory]]()
 
-  override def listFiles(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
+  private def getFileStatusBySeg(seg: SegmentDirectory, fsc: FileStatusCache): SegmentDirectory = {
+    val path = new Path(toPath(seg.segmentName, seg.identifier))
+    val fs = path.getFileSystem(session.sparkContext.hadoopConfiguration)
+    if (fs.isDirectory(path) && fs.exists(path)) {
+      val maybeStatuses = fsc.getLeafFiles(path)
+      if (maybeStatuses.isDefined) {
+        SegmentDirectory(seg.segmentName, seg.identifier, maybeStatuses.get)
+      } else {
+        val statuses = fs.listStatus(path)
+        fsc.putLeafFiles(path, statuses)
+        SegmentDirectory(seg.segmentName, seg.identifier, statuses)
+      }
+    } else {
+      logWarning(s"Segment path ${path.toString} not exists.")
+      SegmentDirectory(seg.segmentName, seg.identifier, Nil)
+    }
+  }
+
+  override def listFiles(partitionFilters: Seq[Expression],
+                         dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
     if (cached.containsKey((partitionFilters, dataFilters))) {
       return cached.get((partitionFilters, dataFilters))
     }
 
     require(isResolved)
+    val startTime = System.nanoTime
     val timePartitionFilters = getSpecFilter(dataFilters, timePartitionColumn)
     logInfo(s"Applying time partition filters: ${timePartitionFilters.mkString(",")}")
 
@@ -223,19 +232,12 @@ class FilePruner(
     var selected = afterPruning("segment", timePartitionFilters, segmentDirs) {
       pruneSegments
     }
-    //    QueryContextFacade.current().record("seg_pruning")
-    selected = selected.par.map { e =>
-      val path = new Path(toPath(e.segmentName, e.identifier))
-      val maybeStatuses = fsc.getLeafFiles(path)
-      if (maybeStatuses.isDefined) {
-        SegmentDirectory(e.segmentName, e.identifier, maybeStatuses.get)
-      } else {
-        val statuses = path.getFileSystem(session.sparkContext.hadoopConfiguration).listStatus(path)
-        fsc.putLeafFiles(path, statuses)
-        SegmentDirectory(e.segmentName, e.identifier, statuses)
-      }
-    }.toIterator.toSeq
-    //    QueryContextFacade.current().record("fetch_file_status")
+
+    // fetch segment directories info in parallel
+    selected = selected.par.map(seg => {
+      getFileStatusBySeg(seg, fsc)
+    }).filter(_.files.nonEmpty).seq
+
     // shards pruning
     selected = afterPruning("shard", dataFilters, selected) {
       pruneShards
@@ -246,6 +248,7 @@ class FilePruner(
     val totalFileSize = selected.flatMap(partition => partition.files).map(_.getLen).sum
     logInfo(s"totalFileSize is ${totalFileSize}")
     setShufflePartitions(totalFileSize, session)
+    logInfo(s"Files pruning in ${(System.nanoTime() - startTime).toDouble / 1000000} ms")
     if (selected.isEmpty) {
       val value = Seq.empty[PartitionDirectory]
       cached.put((partitionFilters, dataFilters), value)
@@ -255,11 +258,12 @@ class FilePruner(
       cached.put((partitionFilters, dataFilters), value)
       value
     }
-
   }
 
-  private def afterPruning(pruningType: String, specFilters: Seq[Expression], inputs: Seq[SegmentDirectory])
-                          (pruningFunc: (Seq[Expression], Seq[SegmentDirectory]) => Seq[SegmentDirectory]): Seq[SegmentDirectory] = {
+  private def afterPruning(pruningType: String, specFilters: Seq[Expression],
+                           inputs: Seq[SegmentDirectory])
+                          (pruningFunc: (Seq[Expression], Seq[SegmentDirectory]) =>
+                            Seq[SegmentDirectory]): Seq[SegmentDirectory] = {
     if (specFilters.isEmpty) {
       inputs
     } else {
@@ -281,9 +285,8 @@ class FilePruner(
     dataFilters.filter(_.references.subsetOf(AttributeSet(col)))
   }
 
-  private def pruneSegments(
-                             filters: Seq[Expression],
-                             segDirs: Seq[SegmentDirectory]): Seq[SegmentDirectory] = {
+  private def pruneSegments(filters: Seq[Expression],
+                            segDirs: Seq[SegmentDirectory]): Seq[SegmentDirectory] = {
 
     val filteredStatuses = if (filters.isEmpty) {
       segDirs
@@ -298,7 +301,8 @@ class FilePruner(
         val pruned = segDirs.filter {
           e => {
             val tsRange = cubeInstance.getSegment(e.segmentName, SegmentStatusEnum.READY).getTSRange
-            SegFilters(tsRange.startValue, tsRange.endValue, pattern).foldFilter(reducedFilter) match {
+            SegFilters(tsRange.startValue, tsRange.endValue, pattern)
+              .foldFilter(reducedFilter) match {
               case Trivial(true) => true
               case Trivial(false) => false
             }
@@ -312,7 +316,7 @@ class FilePruner(
   }
 
   private def pruneShards(filters: Seq[Expression],
-                           segDirs: Seq[SegmentDirectory]): Seq[SegmentDirectory] = {
+                          segDirs: Seq[SegmentDirectory]): Seq[SegmentDirectory] = {
     val filteredStatuses = if (layoutEntity.getShardByColumns.size() != 1) {
       segDirs
     } else {
@@ -323,7 +327,8 @@ class FilePruner(
         val partitionNumber = segment.getCuboidShardNum(layoutEntity.getId).toInt
         require(partitionNumber > 0, "Shards num with shard by col should greater than 0.")
 
-        val bitSet = getExpressionShards(normalizedFiltersAndExpr, shardByColumn.name, partitionNumber)
+        val bitSet = getExpressionShards(normalizedFiltersAndExpr, shardByColumn.name,
+          partitionNumber)
 
         val selected = files.filter(f => {
           val partitionId = FilePruner.getPartitionId(f.getPath)
@@ -331,7 +336,8 @@ class FilePruner(
         })
         SegmentDirectory(segName, segIdentifier, selected)
       }
-      logInfo(s"Selected files after shards pruning:" + pruned.flatMap(_.files).map(_.getPath.toString).mkString(";"))
+      logInfo(s"Selected files after shards pruning:" + pruned.flatMap(_.files)
+        .map(_.getPath.toString).mkString(";"))
       pruned
     }
     filteredStatuses
@@ -348,10 +354,9 @@ class FilePruner(
 
   override def refresh(): Unit = {}
 
-  private def getExpressionShards(
-                                   expr: Expression,
-                                   shardColumnName: String,
-                                   numShards: Int): BitSet = {
+  private def getExpressionShards(expr: Expression,
+                                  shardColumnName: String,
+                                  numShards: Int): BitSet = {
 
     def getShardNumber(attr: Attribute, v: Any): Int = {
       BucketingUtils.getBucketIdFromValue(attr, numShards, v)
diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
index c3d089d..aaec4d5 100644
--- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
+++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
@@ -36,7 +36,7 @@ import org.apache.kylin.common.KylinConfig
 import org.apache.kylin.query.monitor.SparderContextCanary
 import org.apache.kylin.spark.classloader.ClassLoaderUtils
 import org.apache.spark.{SparkConf, SparkContext, SparkEnv}
-import org.apache.spark.sql.execution.datasource.KylinSourceStrategy
+import org.apache.spark.sql.execution.datasource.{KylinSourceStrategy, ShardFileStatusCache}
 import org.apache.spark.sql.metrics.SparderMetricsListener
 import org.apache.spark.utils.YarnInfoFetcherUtils
 
@@ -202,6 +202,9 @@ object SparderContext extends Logging {
         //monitor sparder
         SparderContextCanary.init()
       }
+
+      // init FileStatusCache
+      ShardFileStatusCache.getFileStatusCache(getOriginalSparkSession)
     }
   }
 
diff --git a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBadQueryAndPushDownTest.java b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBadQueryAndPushDownTest.java
index 3e50415..744922a 100644
--- a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBadQueryAndPushDownTest.java
+++ b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBadQueryAndPushDownTest.java
@@ -92,7 +92,7 @@ public class NBadQueryAndPushDownTest extends LocalWithSparkSessionTest {
 
     @Test
     public void testPushDownForFileNotExist() throws Exception {
-        final String sql = "select max(price) from test_kylin_fact";
+        final String sql = "select max(ITEM_COUNT) from test_kylin_fact";
         KylinConfig.getInstanceFromEnv().setProperty(PUSHDOWN_RUNNER_KEY,
                 "org.apache.kylin.query.pushdown.PushDownRunnerSparkImpl");
         try {