You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/05/27 07:58:50 UTC

[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2893: [HUDI-1371] [HUDI-1893] Support metadata based listing for Spark DataSource and Spark SQL

pengzhiwei2018 commented on a change in pull request #2893:
URL: https://github.com/apache/hudi/pull/2893#discussion_r640377103



##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##########
@@ -151,13 +184,33 @@ case class HoodieFileIndex(
     metaClient.reloadActiveTimeline()
     val activeInstants = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants
     fileSystemView = new HoodieTableFileSystemView(metaClient, activeInstants, allFiles)
-    cachedAllInputFiles = fileSystemView.getLatestBaseFiles.iterator().asScala.toArray
-    cachedAllPartitionPaths = partitionFiles.keys.toSeq
-    cachedFileSize = cachedAllInputFiles.map(_.getFileLen).sum
+
+    (tableType, queryType) match {
+      case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL) =>
+        // Fetch and store latest base and log files, and their sizes
+        cachedAllInputFiles = partitionFiles.map(p => {
+          val latestSlices = fileSystemView.getLatestMergedFileSlicesBeforeOrOn(p._1.partitionPath, activeInstants.lastInstant().get().getTimestamp)

Review comment:
       If there is no commit success yet, `activeInstants.lastInstant().get()` may lead to query crash. So we'd better to return empty file list.

##########
File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##########
@@ -107,34 +113,61 @@ case class HoodieFileIndex(
   }
 
   @transient @volatile private var fileSystemView: HoodieTableFileSystemView = _
-  @transient @volatile private var cachedAllInputFiles: Array[HoodieBaseFile] = _
+  @transient @volatile private var cachedAllInputFiles: Map[PartitionRowPath, Map[HoodieBaseFile, Seq[HoodieLogFile]]] = _
   @transient @volatile private var cachedFileSize: Long = 0L
-  @transient @volatile private var cachedAllPartitionPaths: Seq[PartitionRowPath] = _
 
   @volatile private var queryAsNonePartitionedTable: Boolean = _
 
   refresh0()
 
   override def rootPaths: Seq[Path] = queryPath :: Nil
 
+  /**
+   * Invoked by Spark to fetch list of latest base files per partition.
+   *
+   * @param partitionFilters partition column filters
+   * @param dataFilters data columns filters
+   * @return list of PartitionDirectory containing partition to base files mapping
+   */
   override def listFiles(partitionFilters: Seq[Expression],
                          dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
     if (queryAsNonePartitionedTable) { // Read as Non-Partitioned table.
       Seq(PartitionDirectory(InternalRow.empty, allFiles))
     } else {
       // Prune the partition path by the partition filters
-      val prunedPartitions = prunePartition(cachedAllPartitionPaths, partitionFilters)
+      val prunedPartitions = prunePartition(cachedAllInputFiles.keys.toSeq, partitionFilters)
       prunedPartitions.map { partition =>
-        val fileStatues = fileSystemView.getLatestBaseFiles(partition.partitionPath).iterator()

Review comment:
       No! We will cache the `files` in the `fileSystemView`. So each call of `listFiles` will reuse the cache values.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org