You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/02/02 04:51:14 UTC

[GitHub] [hudi] nsivabalan commented on a change in pull request #4556: [HUDI-3191][Stacked on 4716] Removing duplicating file-listing process w/in Hive's MOR `FileInputFormat`s

nsivabalan commented on a change in pull request #4556:
URL: https://github.com/apache/hudi/pull/4556#discussion_r797255058



##########
File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
##########
@@ -189,8 +190,10 @@ public void testUpsertPartitioner(boolean populateMetaFields) throws Exception {
 
       assertTrue(fileIdToNewSize.entrySet().stream().anyMatch(entry -> fileIdToSize.get(entry.getKey()) < entry.getValue()));
 
-      List<String> dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
-      List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles,
+      List<String> inputPaths = roView.getLatestBaseFiles()
+          .map(baseFile -> new Path(baseFile.getPath()).getParent().toString())
+          .collect(Collectors.toList());

Review comment:
       +1 

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java
##########
@@ -187,13 +285,28 @@ private static FileStatus getFileStatusUnchecked(Option<HoodieBaseFile> baseFile
               .map(fileSlice -> {
                 Option<HoodieBaseFile> baseFileOpt = fileSlice.getBaseFile();
                 Option<HoodieLogFile> latestLogFileOpt = fileSlice.getLatestLogFile();
-                if (baseFileOpt.isPresent()) {
-                  return getFileStatusUnchecked(baseFileOpt);
-                } else if (includeLogFilesForSnapShotView() && latestLogFileOpt.isPresent()) {
-                  return createRealtimeFileStatusUnchecked(latestLogFileOpt.get(), fileSlice.getLogFiles());
+                Stream<HoodieLogFile> logFiles = fileSlice.getLogFiles();
+
+                Option<HoodieInstant> latestCompletedInstantOpt =
+                    fromScala(fileIndex.latestCompletedInstant());
+
+                // Check if we're reading a MOR table
+                if (includeLogFilesForSnapshotView()) {
+                  if (baseFileOpt.isPresent()) {
+                    return createRealtimeFileStatusUnchecked(baseFileOpt.get(), logFiles, latestCompletedInstantOpt, tableMetaClient);

Review comment:
       can you help me understand, what does a List<Path> snapshotPaths can refer to. can it represent both a base file and a log file, or only either of them ? 

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java
##########
@@ -143,6 +121,126 @@ private static FileStatus getFileStatusUnchecked(Option<HoodieBaseFile> baseFile
     return returns.toArray(new FileStatus[0]);
   }
 
+  private void validate(List<FileStatus> targetFiles, List<FileStatus> legacyFileStatuses) {
+    List<FileStatus> diff = CollectionUtils.diff(targetFiles, legacyFileStatuses);
+    checkState(diff.isEmpty(), "Should be empty");
+  }
+
+  @Nonnull
+  private static FileStatus getFileStatusUnchecked(HoodieBaseFile baseFile) {
+    try {
+      return HoodieInputFormatUtils.getFileStatus(baseFile);
+    } catch (IOException ioe) {
+      throw new HoodieIOException("Failed to get file-status", ioe);
+    }
+  }
+
+  /**
+   * Abstracts and exposes {@link FileInputFormat#listStatus(JobConf)} operation to subclasses that
+   * lists files (returning an array of {@link FileStatus}) corresponding to the input paths specified
+   * as part of provided {@link JobConf}
+   */
+  protected final FileStatus[] doListStatus(JobConf job) throws IOException {
+    return super.listStatus(job);
+  }
+
+  /**
+   * Achieves listStatus functionality for an incrementally queried table. Instead of listing all
+   * partitions and then filtering based on the commits of interest, this logic first extracts the
+   * partitions touched by the desired commits and then lists only those partitions.
+   */
+  protected List<FileStatus> listStatusForIncrementalMode(JobConf job,
+                                                          HoodieTableMetaClient tableMetaClient,
+                                                          List<Path> inputPaths,
+                                                          String incrementalTable) throws IOException {
+    Job jobContext = Job.getInstance(job);
+    Option<HoodieTimeline> timeline = HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
+    if (!timeline.isPresent()) {
+      return null;
+    }
+    Option<List<HoodieInstant>> commitsToCheck = HoodieInputFormatUtils.getCommitsForIncrementalQuery(jobContext, incrementalTable, timeline.get());
+    if (!commitsToCheck.isPresent()) {
+      return null;
+    }
+    Option<String> incrementalInputPaths = HoodieInputFormatUtils.getAffectedPartitions(commitsToCheck.get(), tableMetaClient, timeline.get(), inputPaths);
+    // Mutate the JobConf to set the input paths to only partitions touched by incremental pull.
+    if (!incrementalInputPaths.isPresent()) {
+      return null;
+    }
+    setInputPaths(job, incrementalInputPaths.get());
+    FileStatus[] fileStatuses = doListStatus(job);
+    return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext, tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get());
+  }
+
+  protected abstract boolean includeLogFilesForSnapshotView();
+
+  @Nonnull
+  private static RealtimeFileStatus createRealtimeFileStatusUnchecked(HoodieBaseFile baseFile,
+                                                                      Stream<HoodieLogFile> logFiles,
+                                                                      Option<HoodieInstant> latestCompletedInstantOpt,
+                                                                      HoodieTableMetaClient tableMetaClient) {
+    List<HoodieLogFile> sortedLogFiles = logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());

Review comment:
       did we move this method from elsewhere or did you add it as part of this patch ? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org