You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/07/11 19:20:12 UTC

[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5722: [HUDI-4170] Make user can use hoodie.datasource.read.paths to read necessary files

alexeykudinkin commented on code in PR #5722:
URL: https://github.com/apache/hudi/pull/5722#discussion_r918262428


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala:
##########
@@ -87,6 +85,16 @@ class DefaultSource extends RelationProvider
     } else {
       Seq.empty
     }
+
+    // Add default options for unspecified read options keys.
+    val parameters = if(globPaths.nonEmpty) {
+      Map(
+        "glob.paths" -> globPaths.mkString(",")
+      ) ++ DataSourceOptionsHelper.parametersWithReadDefaults(optParams)

Review Comment:
   Let's de-duplicate addition of `DataSourceOptionsHelper.parametersWithReadDefaults(optParams)`



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala:
##########
@@ -87,6 +85,16 @@ class DefaultSource extends RelationProvider
     } else {
       Seq.empty
     }
+
+    // Add default options for unspecified read options keys.
+    val parameters = if(globPaths.nonEmpty) {

Review Comment:
   nit: Spacing after if



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala:
##########
@@ -296,6 +296,39 @@ class TestCOWDataSource extends HoodieClientTestBase {
     assertEquals("replacecommit", commits(1))
   }
 
+  @Test
+  def testReadPathsOnCopyOnWriteTable(): Unit = {
+    val records1 = dataGen.generateInsertsContainsAllPartitions("001", 20)
+    val inputDF1 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1), 2))
+    inputDF1.write.format("org.apache.hudi")
+      .options(commonOpts)
+      .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .mode(SaveMode.Append)
+      .save(basePath)
+
+    val baseFilePath = fs.listStatus(new Path(basePath, dataGen.getPartitionPaths.head))

Review Comment:
   These aren't really a base-path, these're partition paths



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -340,20 +341,51 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
    */
   protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FileSplit]
 
-  protected def listLatestBaseFiles(globbedPaths: Seq[Path], partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Map[Path, Seq[FileStatus]] = {
-    val partitionDirs = if (globbedPaths.isEmpty) {
+  /**
+   * Construct HoodieTableFileSystemView based on globPaths if specified, otherwise use the table path.
+   * Will perform pruning if necessary
+   */
+  private def getHoodieFsView(globPaths: Seq[Path], partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): HoodieTableFileSystemView = {
+    val partitionDirs = if (globPaths.isEmpty) {
       fileIndex.listFiles(partitionFilters, dataFilters)
     } else {
-      val inMemoryFileIndex = HoodieInMemoryFileIndex.create(sparkSession, globbedPaths)
+      val inMemoryFileIndex = HoodieInMemoryFileIndex.create(sparkSession, globPaths)
       inMemoryFileIndex.listFiles(partitionFilters, dataFilters)
     }
 
-    val fsView = new HoodieTableFileSystemView(metaClient, timeline, partitionDirs.flatMap(_.files).toArray)
+    new HoodieTableFileSystemView(metaClient, timeline, partitionDirs.flatMap(_.files).toArray)
+  }
+
+  /**
+   * Get all latest base files with partition paths, if globPaths is empty, will listing files
+   * under the table path.
+   */
+  protected def listLatestBaseFiles(globPaths: Seq[Path], partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Map[Path, Seq[FileStatus]] = {
+    val fsView = getHoodieFsView(globPaths, partitionFilters, dataFilters)
     val latestBaseFiles = fsView.getLatestBaseFiles.iterator().asScala.toList.map(_.getFileStatus)
 
     latestBaseFiles.groupBy(getPartitionPath)
   }
 
+  /**
+   * Get all fileSlices(contains base files and log files if exist) from globPaths if not empty,
+   * otherwise will use the table path to do the listing.
+   */
+  protected def listLatestFileSlices(globPaths: Seq[Path], partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FileSlice] = {
+    val fsView = getHoodieFsView(globPaths, partitionFilters, dataFilters)
+    val partitionPaths = fsView.getPartitionPaths.asScala
+
+    if (partitionPaths.isEmpty || latestInstant.isEmpty) {

Review Comment:
   If `latestInstant` is empty we can return even before we do the file-listing



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -340,20 +341,51 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
    */
   protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FileSplit]
 
-  protected def listLatestBaseFiles(globbedPaths: Seq[Path], partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Map[Path, Seq[FileStatus]] = {
-    val partitionDirs = if (globbedPaths.isEmpty) {
+  /**
+   * Construct HoodieTableFileSystemView based on globPaths if specified, otherwise use the table path.
+   * Will perform pruning if necessary
+   */
+  private def getHoodieFsView(globPaths: Seq[Path], partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): HoodieTableFileSystemView = {

Review Comment:
   Let's rather modify this method to return list of `PartitionDirectory`s -- it makes an intent and the cost of this method very clear (returning HFSV, masks the fact that we're doing file-listing internally)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org