You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/02/02 21:01:18 UTC

[GitHub] [hudi] alexeykudinkin commented on a change in pull request #4556: [HUDI-3191][Stacked on 4716] Removing duplicating file-listing process w/in Hive's MOR `FileInputFormat`s

alexeykudinkin commented on a change in pull request #4556:
URL: https://github.com/apache/hudi/pull/4556#discussion_r798007427



##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java
##########
@@ -143,6 +121,127 @@ private static FileStatus getFileStatusUnchecked(Option<HoodieBaseFile> baseFile
     return returns.toArray(new FileStatus[0]);
   }
 
+  private void validate(List<FileStatus> targetFiles, List<FileStatus> legacyFileStatuses) {
+    List<FileStatus> diff = CollectionUtils.diff(targetFiles, legacyFileStatuses);
+    checkState(diff.isEmpty(), "Should be empty");
+  }
+
+  @Nonnull
+  private static FileStatus getFileStatusUnchecked(HoodieBaseFile baseFile) {
+    try {
+      return HoodieInputFormatUtils.getFileStatus(baseFile);
+    } catch (IOException ioe) {
+      throw new HoodieIOException("Failed to get file-status", ioe);
+    }
+  }
+
+  /**
+   * Abstracts and exposes {@link FileInputFormat#listStatus(JobConf)} operation to subclasses that
+   * lists files (returning an array of {@link FileStatus}) corresponding to the input paths specified
+   * as part of provided {@link JobConf}
+   */
+  protected final FileStatus[] doListStatus(JobConf job) throws IOException {
+    return super.listStatus(job);
+  }
+
+  /**
+   * Achieves listStatus functionality for an incrementally queried table. Instead of listing all
+   * partitions and then filtering based on the commits of interest, this logic first extracts the
+   * partitions touched by the desired commits and then lists only those partitions.
+   */
+  protected List<FileStatus> listStatusForIncrementalMode(JobConf job,
+                                                          HoodieTableMetaClient tableMetaClient,
+                                                          List<Path> inputPaths,
+                                                          String incrementalTable) throws IOException {
+    String tableName = tableMetaClient.getTableConfig().getTableName();
+    Job jobContext = Job.getInstance(job);
+    Option<HoodieTimeline> timeline = HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
+    if (!timeline.isPresent()) {
+      return null;
+    }
+    Option<List<HoodieInstant>> commitsToCheck = HoodieInputFormatUtils.getCommitsForIncrementalQuery(jobContext, tableName, timeline.get());
+    if (!commitsToCheck.isPresent()) {
+      return null;
+    }
+    Option<String> incrementalInputPaths = HoodieInputFormatUtils.getAffectedPartitions(commitsToCheck.get(), tableMetaClient, timeline.get(), inputPaths);
+    // Mutate the JobConf to set the input paths to only partitions touched by incremental pull.
+    if (!incrementalInputPaths.isPresent()) {
+      return null;
+    }
+    setInputPaths(job, incrementalInputPaths.get());
+    FileStatus[] fileStatuses = doListStatus(job);
+    return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext, tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get());
+  }
+
+  protected abstract boolean includeLogFilesForSnapshotView();
+
+  @Nonnull
+  private static RealtimeFileStatus createRealtimeFileStatusUnchecked(HoodieBaseFile baseFile,
+                                                                      Stream<HoodieLogFile> logFiles,
+                                                                      Option<HoodieInstant> latestCompletedInstantOpt,
+                                                                      HoodieTableMetaClient tableMetaClient) {
+    List<HoodieLogFile> sortedLogFiles = logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
+    FileStatus baseFileStatus = getFileStatusUnchecked(baseFile);
+    try {
+      RealtimeFileStatus rtFileStatus = new RealtimeFileStatus(baseFileStatus);
+      rtFileStatus.setDeltaLogFiles(sortedLogFiles);
+      rtFileStatus.setBaseFilePath(baseFile.getPath());
+      rtFileStatus.setBasePath(tableMetaClient.getBasePath());
+
+      if (latestCompletedInstantOpt.isPresent()) {
+        HoodieInstant latestCompletedInstant = latestCompletedInstantOpt.get();
+        checkState(latestCompletedInstant.isCompleted());
+
+        rtFileStatus.setMaxCommitTime(latestCompletedInstant.getTimestamp());
+      }
+
+      if (baseFileStatus instanceof LocatedFileStatusWithBootstrapBaseFile || baseFileStatus instanceof FileStatusWithBootstrapBaseFile) {
+        rtFileStatus.setBootStrapFileStatus(baseFileStatus);
+      }
+
+      return rtFileStatus;
+    } catch (IOException e) {
+      throw new HoodieIOException(String.format("Failed to init %s", RealtimeFileStatus.class.getSimpleName()), e);
+    }
+  }
+
+  @Nonnull
+  private List<FileStatus> listStatusForSnapshotModeLegacy(JobConf job, Map<String, HoodieTableMetaClient> tableMetaClientMap, List<Path> snapshotPaths) throws IOException {
+    return HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, tableMetaClientMap, snapshotPaths, includeLogFilesForSnapshotView());
+  }
+
+  @Nonnull
+  private static RealtimeFileStatus createRealtimeFileStatusUnchecked(HoodieLogFile latestLogFile,

Review comment:
       Not sure i follow your suggestion, can you elaborate on what you have in mind to be included in this doc? 

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java
##########
@@ -143,6 +121,126 @@ private static FileStatus getFileStatusUnchecked(Option<HoodieBaseFile> baseFile
     return returns.toArray(new FileStatus[0]);
   }
 
+  private void validate(List<FileStatus> targetFiles, List<FileStatus> legacyFileStatuses) {
+    List<FileStatus> diff = CollectionUtils.diff(targetFiles, legacyFileStatuses);
+    checkState(diff.isEmpty(), "Should be empty");
+  }
+
+  @Nonnull
+  private static FileStatus getFileStatusUnchecked(HoodieBaseFile baseFile) {
+    try {
+      return HoodieInputFormatUtils.getFileStatus(baseFile);
+    } catch (IOException ioe) {
+      throw new HoodieIOException("Failed to get file-status", ioe);
+    }
+  }
+
+  /**
+   * Abstracts and exposes {@link FileInputFormat#listStatus(JobConf)} operation to subclasses that
+   * lists files (returning an array of {@link FileStatus}) corresponding to the input paths specified
+   * as part of provided {@link JobConf}
+   */
+  protected final FileStatus[] doListStatus(JobConf job) throws IOException {
+    return super.listStatus(job);
+  }
+
+  /**
+   * Achieves listStatus functionality for an incrementally queried table. Instead of listing all
+   * partitions and then filtering based on the commits of interest, this logic first extracts the
+   * partitions touched by the desired commits and then lists only those partitions.
+   */
+  protected List<FileStatus> listStatusForIncrementalMode(JobConf job,
+                                                          HoodieTableMetaClient tableMetaClient,
+                                                          List<Path> inputPaths,
+                                                          String incrementalTable) throws IOException {
+    Job jobContext = Job.getInstance(job);
+    Option<HoodieTimeline> timeline = HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
+    if (!timeline.isPresent()) {
+      return null;
+    }
+    Option<List<HoodieInstant>> commitsToCheck = HoodieInputFormatUtils.getCommitsForIncrementalQuery(jobContext, incrementalTable, timeline.get());
+    if (!commitsToCheck.isPresent()) {
+      return null;
+    }
+    Option<String> incrementalInputPaths = HoodieInputFormatUtils.getAffectedPartitions(commitsToCheck.get(), tableMetaClient, timeline.get(), inputPaths);
+    // Mutate the JobConf to set the input paths to only partitions touched by incremental pull.
+    if (!incrementalInputPaths.isPresent()) {
+      return null;
+    }
+    setInputPaths(job, incrementalInputPaths.get());
+    FileStatus[] fileStatuses = doListStatus(job);
+    return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext, tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get());
+  }
+
+  protected abstract boolean includeLogFilesForSnapshotView();
+
+  @Nonnull
+  private static RealtimeFileStatus createRealtimeFileStatusUnchecked(HoodieBaseFile baseFile,
+                                                                      Stream<HoodieLogFile> logFiles,
+                                                                      Option<HoodieInstant> latestCompletedInstantOpt,
+                                                                      HoodieTableMetaClient tableMetaClient) {
+    List<HoodieLogFile> sortedLogFiles = logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());

Review comment:
       This is a new

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java
##########
@@ -143,6 +121,127 @@ private static FileStatus getFileStatusUnchecked(Option<HoodieBaseFile> baseFile
     return returns.toArray(new FileStatus[0]);
   }
 
+  private void validate(List<FileStatus> targetFiles, List<FileStatus> legacyFileStatuses) {
+    List<FileStatus> diff = CollectionUtils.diff(targetFiles, legacyFileStatuses);
+    checkState(diff.isEmpty(), "Should be empty");
+  }
+
+  @Nonnull
+  private static FileStatus getFileStatusUnchecked(HoodieBaseFile baseFile) {
+    try {
+      return HoodieInputFormatUtils.getFileStatus(baseFile);
+    } catch (IOException ioe) {
+      throw new HoodieIOException("Failed to get file-status", ioe);
+    }
+  }
+
+  /**
+   * Abstracts and exposes {@link FileInputFormat#listStatus(JobConf)} operation to subclasses that
+   * lists files (returning an array of {@link FileStatus}) corresponding to the input paths specified
+   * as part of provided {@link JobConf}
+   */
+  protected final FileStatus[] doListStatus(JobConf job) throws IOException {
+    return super.listStatus(job);
+  }
+
+  /**
+   * Achieves listStatus functionality for an incrementally queried table. Instead of listing all
+   * partitions and then filtering based on the commits of interest, this logic first extracts the
+   * partitions touched by the desired commits and then lists only those partitions.
+   */
+  protected List<FileStatus> listStatusForIncrementalMode(JobConf job,
+                                                          HoodieTableMetaClient tableMetaClient,
+                                                          List<Path> inputPaths,
+                                                          String incrementalTable) throws IOException {
+    String tableName = tableMetaClient.getTableConfig().getTableName();
+    Job jobContext = Job.getInstance(job);
+    Option<HoodieTimeline> timeline = HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
+    if (!timeline.isPresent()) {
+      return null;
+    }
+    Option<List<HoodieInstant>> commitsToCheck = HoodieInputFormatUtils.getCommitsForIncrementalQuery(jobContext, tableName, timeline.get());
+    if (!commitsToCheck.isPresent()) {
+      return null;
+    }
+    Option<String> incrementalInputPaths = HoodieInputFormatUtils.getAffectedPartitions(commitsToCheck.get(), tableMetaClient, timeline.get(), inputPaths);
+    // Mutate the JobConf to set the input paths to only partitions touched by incremental pull.
+    if (!incrementalInputPaths.isPresent()) {
+      return null;
+    }
+    setInputPaths(job, incrementalInputPaths.get());
+    FileStatus[] fileStatuses = doListStatus(job);
+    return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext, tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get());
+  }
+
+  protected abstract boolean includeLogFilesForSnapshotView();
+
+  @Nonnull
+  private static RealtimeFileStatus createRealtimeFileStatusUnchecked(HoodieBaseFile baseFile,

Review comment:
       Do you have something in mind?

##########
File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
##########
@@ -189,8 +190,10 @@ public void testUpsertPartitioner(boolean populateMetaFields) throws Exception {
 
       assertTrue(fileIdToNewSize.entrySet().stream().anyMatch(entry -> fileIdToSize.get(entry.getKey()) < entry.getValue()));
 
-      List<String> dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
-      List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles,
+      List<String> inputPaths = roView.getLatestBaseFiles()
+          .map(baseFile -> new Path(baseFile.getPath()).getParent().toString())
+          .collect(Collectors.toList());

Review comment:
       Duplication isn't an issue b/c this list is only used for files filtering currently

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java
##########
@@ -187,13 +285,28 @@ private static FileStatus getFileStatusUnchecked(Option<HoodieBaseFile> baseFile
               .map(fileSlice -> {
                 Option<HoodieBaseFile> baseFileOpt = fileSlice.getBaseFile();
                 Option<HoodieLogFile> latestLogFileOpt = fileSlice.getLatestLogFile();
-                if (baseFileOpt.isPresent()) {
-                  return getFileStatusUnchecked(baseFileOpt);
-                } else if (includeLogFilesForSnapShotView() && latestLogFileOpt.isPresent()) {
-                  return createRealtimeFileStatusUnchecked(latestLogFileOpt.get(), fileSlice.getLogFiles());
+                Stream<HoodieLogFile> logFiles = fileSlice.getLogFiles();
+
+                Option<HoodieInstant> latestCompletedInstantOpt =
+                    fromScala(fileIndex.latestCompletedInstant());
+
+                // Check if we're reading a MOR table
+                if (includeLogFilesForSnapshotView()) {
+                  if (baseFileOpt.isPresent()) {
+                    return createRealtimeFileStatusUnchecked(baseFileOpt.get(), logFiles, latestCompletedInstantOpt, tableMetaClient);

Review comment:
       A list of partition paths that are being read by Hive in snapshot mode (name is a carry-over from the previous code)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org