You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/01/25 02:42:35 UTC

[GitHub] [hudi] yihua commented on a change in pull request #4556: [HUDI-3191] Removing duplicating file-listing process w/in Hive's MOR `FileInputFormat`s

yihua commented on a change in pull request #4556:
URL: https://github.com/apache/hudi/pull/4556#discussion_r791234892



##########
File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
##########
@@ -268,11 +281,13 @@ void testRollbackWithDeltaAndCompactionCommit(boolean rollbackUsingMarkers) thro
         thirdClient.startCommitWithTime(newCommitTime);
 
         writeStatusJavaRDD = thirdClient.upsert(writeRecords, newCommitTime);
+
         statuses = writeStatusJavaRDD.collect();
-        thirdClient.commit(newCommitTime, writeStatusJavaRDD);
         // Verify there are no errors
         assertNoWriteErrors(statuses);
 
+        thirdClient.commit(newCommitTime, jsc().parallelize(statuses));
+

Review comment:
       No need to change these?

##########
File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
##########
@@ -213,8 +214,11 @@ public void testSimpleInsertUpdateAndDelete(boolean populateMetaFields) throws E
       dataFilesToRead = tableView.getLatestBaseFiles();
       assertTrue(dataFilesToRead.findAny().isPresent());
 
-      List<String> dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
-      List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles, basePath(), new JobConf(hadoopConf()), true, false);
+      List<String> inputPaths = tableView.getLatestBaseFiles()
+          .map(baseFile -> new Path(baseFile.getPath()).getParent().toString())
+          .collect(Collectors.toList());

Review comment:
       Same here

##########
File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
##########
@@ -166,10 +169,12 @@ void testRollbackWithDeltaAndCompactionCommit(boolean rollbackUsingMarkers) thro
       JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
 
       JavaRDD<WriteStatus> writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime);
-      client.commit(newCommitTime, writeStatusJavaRDD);
+
       List<WriteStatus> statuses = writeStatusJavaRDD.collect();
       assertNoWriteErrors(statuses);
 
+      client.commit(newCommitTime, jsc().parallelize(statuses));
+

Review comment:
       No need to change these lines?

##########
File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
##########
@@ -201,8 +206,10 @@ void testRollbackWithDeltaAndCompactionCommit(boolean rollbackUsingMarkers) thro
         copyOfRecords = dataGen.generateUpdates(commitTime1, copyOfRecords);
         copyOfRecords.addAll(dataGen.generateInserts(commitTime1, 200));
 
-        List<String> dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
-        List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles,
+        List<String> inputPaths = tableView.getLatestBaseFiles()
+            .map(baseFile -> new Path(baseFile.getPath()).getParent().toString())
+            .collect(Collectors.toList());

Review comment:
       Same here for duplicated partition paths and below.

##########
File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
##########
@@ -189,8 +190,10 @@ public void testUpsertPartitioner(boolean populateMetaFields) throws Exception {
 
       assertTrue(fileIdToNewSize.entrySet().stream().anyMatch(entry -> fileIdToSize.get(entry.getKey()) < entry.getValue()));
 
-      List<String> dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
-      List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles,
+      List<String> inputPaths = roView.getLatestBaseFiles()
+          .map(baseFile -> new Path(baseFile.getPath()).getParent().toString())
+          .collect(Collectors.toList());

Review comment:
       This can include duplicated partition paths.  Should `Set<String>` be used instead?

##########
File path: hudi-common/src/main/scala/org/apache/hudi/HoodieTableFileIndexBase.scala
##########
@@ -87,6 +89,12 @@ abstract class HoodieTableFileIndexBase(engineContext: HoodieEngineContext,
 
   refresh0()
 
+  /**
+   * Returns latest completed instant as seen by this instance of the file-index
+   */
+  def latestCompletedInstant(): Option[HoodieInstant] =

Review comment:
       nit: rename to `getLatestCompletedInstant`?

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/BootstrapBaseFileSplit.java
##########
@@ -32,10 +32,6 @@
 
   private FileSplit bootstrapFileSplit;
 
-  public BootstrapBaseFileSplit() {
-    super();
-  }

Review comment:
       Are we sure this is not used outside Hudi repo by code in query engines, such as Presto/Trino?  If not certain, we can keep it as is.

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java
##########
@@ -143,6 +121,127 @@ private static FileStatus getFileStatusUnchecked(Option<HoodieBaseFile> baseFile
     return returns.toArray(new FileStatus[0]);
   }
 
+  private void validate(List<FileStatus> targetFiles, List<FileStatus> legacyFileStatuses) {
+    List<FileStatus> diff = CollectionUtils.diff(targetFiles, legacyFileStatuses);
+    checkState(diff.isEmpty(), "Should be empty");
+  }
+
+  @Nonnull
+  private static FileStatus getFileStatusUnchecked(HoodieBaseFile baseFile) {
+    try {
+      return HoodieInputFormatUtils.getFileStatus(baseFile);
+    } catch (IOException ioe) {
+      throw new HoodieIOException("Failed to get file-status", ioe);
+    }
+  }
+
+  /**
+   * Abstracts and exposes {@link FileInputFormat#listStatus(JobConf)} operation to subclasses that
+   * lists files (returning an array of {@link FileStatus}) corresponding to the input paths specified
+   * as part of provided {@link JobConf}
+   */
+  protected final FileStatus[] doListStatus(JobConf job) throws IOException {
+    return super.listStatus(job);
+  }
+
+  /**
+   * Achieves listStatus functionality for an incrementally queried table. Instead of listing all
+   * partitions and then filtering based on the commits of interest, this logic first extracts the
+   * partitions touched by the desired commits and then lists only those partitions.
+   */
+  protected List<FileStatus> listStatusForIncrementalMode(JobConf job,
+                                                          HoodieTableMetaClient tableMetaClient,
+                                                          List<Path> inputPaths,
+                                                          String incrementalTable) throws IOException {
+    String tableName = tableMetaClient.getTableConfig().getTableName();
+    Job jobContext = Job.getInstance(job);
+    Option<HoodieTimeline> timeline = HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
+    if (!timeline.isPresent()) {
+      return null;
+    }
+    Option<List<HoodieInstant>> commitsToCheck = HoodieInputFormatUtils.getCommitsForIncrementalQuery(jobContext, tableName, timeline.get());
+    if (!commitsToCheck.isPresent()) {
+      return null;
+    }
+    Option<String> incrementalInputPaths = HoodieInputFormatUtils.getAffectedPartitions(commitsToCheck.get(), tableMetaClient, timeline.get(), inputPaths);
+    // Mutate the JobConf to set the input paths to only partitions touched by incremental pull.
+    if (!incrementalInputPaths.isPresent()) {
+      return null;
+    }
+    setInputPaths(job, incrementalInputPaths.get());
+    FileStatus[] fileStatuses = doListStatus(job);
+    return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext, tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get());
+  }
+
+  protected abstract boolean includeLogFilesForSnapshotView();
+
+  @Nonnull
+  private static RealtimeFileStatus createRealtimeFileStatusUnchecked(HoodieBaseFile baseFile,
+                                                                      Stream<HoodieLogFile> logFiles,
+                                                                      Option<HoodieInstant> latestCompletedInstantOpt,
+                                                                      HoodieTableMetaClient tableMetaClient) {
+    List<HoodieLogFile> sortedLogFiles = logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
+    FileStatus baseFileStatus = getFileStatusUnchecked(baseFile);
+    try {
+      RealtimeFileStatus rtFileStatus = new RealtimeFileStatus(baseFileStatus);
+      rtFileStatus.setDeltaLogFiles(sortedLogFiles);
+      rtFileStatus.setBaseFilePath(baseFile.getPath());
+      rtFileStatus.setBasePath(tableMetaClient.getBasePath());
+
+      if (latestCompletedInstantOpt.isPresent()) {
+        HoodieInstant latestCompletedInstant = latestCompletedInstantOpt.get();
+        checkState(latestCompletedInstant.isCompleted());
+
+        rtFileStatus.setMaxCommitTime(latestCompletedInstant.getTimestamp());
+      }
+
+      if (baseFileStatus instanceof LocatedFileStatusWithBootstrapBaseFile || baseFileStatus instanceof FileStatusWithBootstrapBaseFile) {
+        rtFileStatus.setBootStrapFileStatus(baseFileStatus);
+      }
+
+      return rtFileStatus;
+    } catch (IOException e) {
+      throw new HoodieIOException(String.format("Failed to init %s", RealtimeFileStatus.class.getSimpleName()), e);
+    }
+  }
+
+  @Nonnull
+  private List<FileStatus> listStatusForSnapshotModeLegacy(JobConf job, Map<String, HoodieTableMetaClient> tableMetaClientMap, List<Path> snapshotPaths) throws IOException {
+    return HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, tableMetaClientMap, snapshotPaths, includeLogFilesForSnapshotView());
+  }
+
+  @Nonnull
+  private static RealtimeFileStatus createRealtimeFileStatusUnchecked(HoodieLogFile latestLogFile,
+                                                                      Stream<HoodieLogFile> logFiles,
+                                                                      Option<HoodieInstant> latestCompletedInstantOpt,
+                                                                      HoodieTableMetaClient tableMetaClient) {
+    List<HoodieLogFile> sortedLogFiles = logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
+    try {
+      RealtimeFileStatus rtFileStatus = new RealtimeFileStatus(latestLogFile.getFileStatus());
+      rtFileStatus.setDeltaLogFiles(sortedLogFiles);
+      rtFileStatus.setBasePath(tableMetaClient.getBasePath());
+
+      if (latestCompletedInstantOpt.isPresent()) {
+        HoodieInstant latestCompletedInstant = latestCompletedInstantOpt.get();
+        checkState(latestCompletedInstant.isCompleted());

Review comment:
       Do we need to keep this for production?

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java
##########
@@ -143,6 +121,127 @@ private static FileStatus getFileStatusUnchecked(Option<HoodieBaseFile> baseFile
     return returns.toArray(new FileStatus[0]);
   }
 
+  private void validate(List<FileStatus> targetFiles, List<FileStatus> legacyFileStatuses) {
+    List<FileStatus> diff = CollectionUtils.diff(targetFiles, legacyFileStatuses);
+    checkState(diff.isEmpty(), "Should be empty");
+  }
+
+  @Nonnull
+  private static FileStatus getFileStatusUnchecked(HoodieBaseFile baseFile) {
+    try {
+      return HoodieInputFormatUtils.getFileStatus(baseFile);
+    } catch (IOException ioe) {
+      throw new HoodieIOException("Failed to get file-status", ioe);
+    }
+  }
+
+  /**
+   * Abstracts and exposes {@link FileInputFormat#listStatus(JobConf)} operation to subclasses that
+   * lists files (returning an array of {@link FileStatus}) corresponding to the input paths specified
+   * as part of provided {@link JobConf}
+   */
+  protected final FileStatus[] doListStatus(JobConf job) throws IOException {
+    return super.listStatus(job);
+  }
+
+  /**
+   * Achieves listStatus functionality for an incrementally queried table. Instead of listing all
+   * partitions and then filtering based on the commits of interest, this logic first extracts the
+   * partitions touched by the desired commits and then lists only those partitions.
+   */
+  protected List<FileStatus> listStatusForIncrementalMode(JobConf job,
+                                                          HoodieTableMetaClient tableMetaClient,
+                                                          List<Path> inputPaths,
+                                                          String incrementalTable) throws IOException {
+    String tableName = tableMetaClient.getTableConfig().getTableName();
+    Job jobContext = Job.getInstance(job);
+    Option<HoodieTimeline> timeline = HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
+    if (!timeline.isPresent()) {
+      return null;
+    }
+    Option<List<HoodieInstant>> commitsToCheck = HoodieInputFormatUtils.getCommitsForIncrementalQuery(jobContext, tableName, timeline.get());
+    if (!commitsToCheck.isPresent()) {
+      return null;
+    }
+    Option<String> incrementalInputPaths = HoodieInputFormatUtils.getAffectedPartitions(commitsToCheck.get(), tableMetaClient, timeline.get(), inputPaths);
+    // Mutate the JobConf to set the input paths to only partitions touched by incremental pull.
+    if (!incrementalInputPaths.isPresent()) {
+      return null;
+    }
+    setInputPaths(job, incrementalInputPaths.get());
+    FileStatus[] fileStatuses = doListStatus(job);
+    return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext, tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get());
+  }
+
+  protected abstract boolean includeLogFilesForSnapshotView();
+
+  @Nonnull
+  private static RealtimeFileStatus createRealtimeFileStatusUnchecked(HoodieBaseFile baseFile,
+                                                                      Stream<HoodieLogFile> logFiles,
+                                                                      Option<HoodieInstant> latestCompletedInstantOpt,
+                                                                      HoodieTableMetaClient tableMetaClient) {
+    List<HoodieLogFile> sortedLogFiles = logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
+    FileStatus baseFileStatus = getFileStatusUnchecked(baseFile);
+    try {
+      RealtimeFileStatus rtFileStatus = new RealtimeFileStatus(baseFileStatus);
+      rtFileStatus.setDeltaLogFiles(sortedLogFiles);
+      rtFileStatus.setBaseFilePath(baseFile.getPath());
+      rtFileStatus.setBasePath(tableMetaClient.getBasePath());
+
+      if (latestCompletedInstantOpt.isPresent()) {
+        HoodieInstant latestCompletedInstant = latestCompletedInstantOpt.get();
+        checkState(latestCompletedInstant.isCompleted());
+
+        rtFileStatus.setMaxCommitTime(latestCompletedInstant.getTimestamp());
+      }
+
+      if (baseFileStatus instanceof LocatedFileStatusWithBootstrapBaseFile || baseFileStatus instanceof FileStatusWithBootstrapBaseFile) {
+        rtFileStatus.setBootStrapFileStatus(baseFileStatus);
+      }
+
+      return rtFileStatus;
+    } catch (IOException e) {
+      throw new HoodieIOException(String.format("Failed to init %s", RealtimeFileStatus.class.getSimpleName()), e);
+    }
+  }
+
+  @Nonnull
+  private List<FileStatus> listStatusForSnapshotModeLegacy(JobConf job, Map<String, HoodieTableMetaClient> tableMetaClientMap, List<Path> snapshotPaths) throws IOException {
+    return HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, tableMetaClientMap, snapshotPaths, includeLogFilesForSnapshotView());
+  }
+
+  @Nonnull
+  private static RealtimeFileStatus createRealtimeFileStatusUnchecked(HoodieLogFile latestLogFile,

Review comment:
       nit: add docs to explain what's included in the returned file status?

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/PathWithLogFilePath.java
##########
@@ -31,7 +31,7 @@
  */
 public class PathWithLogFilePath extends Path {
   // a flag to mark this split is produced by incremental query or not.
-  private boolean belongToIncrementalPath = false;
+  private boolean belongsToIncrementalPath = false;

Review comment:
       nit:  I see quite some renaming in the PR.  Can they be separated out to another PR from the refactoring?

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java
##########
@@ -143,6 +121,124 @@ private static FileStatus getFileStatusUnchecked(Option<HoodieBaseFile> baseFile
     return returns.toArray(new FileStatus[0]);
   }
 
+  private void validate(List<FileStatus> targetFiles, List<FileStatus> legacyFileStatuses) {

Review comment:
       Are most methods in this class reordered?  Why not keep the same ordering so it's easier to check the changes?

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileSplit.java
##########
@@ -44,9 +44,7 @@
 
   private Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo = Option.empty();
 
-  public HoodieRealtimeFileSplit() {
-    super();

Review comment:
       Still need to call super class constructor?

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##########
@@ -65,11 +65,71 @@
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.apache.hudi.TypeUtils.unsafeCast;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+
 public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
 
   private static final Logger LOG = LogManager.getLogger(HoodieRealtimeInputFormatUtils.class);
 
-  public static InputSplit[] getRealtimeSplits(Configuration conf, Stream<FileSplit> fileSplits) {
+  public static InputSplit[] getRealtimeSplits(Configuration conf, List<FileSplit> fileSplits) throws IOException {
+    if (fileSplits.isEmpty()) {
+      return new InputSplit[0];
+    }
+
+    FileSplit fileSplit = fileSplits.get(0);
+
+    // Pre-process table-config to fetch virtual key info
+    Path partitionPath = fileSplit.getPath().getParent();
+    HoodieTableMetaClient metaClient = getTableMetaClientForBasePathUnchecked(conf, partitionPath);
+
+    Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfoOpt = getHoodieVirtualKeyInfo(metaClient);
+
+    // NOTE: This timeline is kept in sync w/ {@code HoodieTableFileIndexBase}
+    HoodieInstant latestCommitInstant =
+        metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get();
+
+    InputSplit[] finalSplits = fileSplits.stream()
+      .map(split -> {
+        // There are 4 types of splits could we have to handle here
+        //    - {@code BootstrapBaseFileSplit}: in case base file does have associated bootstrap file,
+        //      but does NOT have any log files appended (convert it to {@code RealtimeBootstrapBaseFileSplit})
+        //    - {@code RealtimeBootstrapBaseFileSplit}: in case base file does have associated bootstrap file
+        //      and does have log files appended
+        //    - {@code BaseFileWithLogsSplit}: in case base file does NOT have associated bootstrap file
+        //       and does have log files appended;
+        //    - {@code FileSplit}: in case Hive passed down non-Hudi path
+        //
+        if (split instanceof RealtimeBootstrapBaseFileSplit) {
+          return split;
+        } else if (split instanceof BootstrapBaseFileSplit) {
+          BootstrapBaseFileSplit bootstrapBaseFileSplit = unsafeCast(split);
+          return createRealtimeBoostrapBaseFileSplit(
+              bootstrapBaseFileSplit,
+              metaClient.getBasePath(),
+              Collections.emptyList(),
+              latestCommitInstant.getTimestamp(),
+              false);
+        } else if (split instanceof BaseFileWithLogsSplit) {
+          BaseFileWithLogsSplit baseFileWithLogsSplit = unsafeCast(split);
+          return createHoodieRealtimeSplitUnchecked(baseFileWithLogsSplit, hoodieVirtualKeyInfoOpt);
+        } else {
+          // Non-Hudi paths might result in just generic {@code FileSplit} being
+          // propagated up to this point
+          return split;
+        }
+      })
+      .toArray(InputSplit[]::new);

Review comment:
       This can be avoided if returning List<>.

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##########
@@ -144,28 +204,32 @@
     return rtSplits.toArray(new InputSplit[0]);
   }
 
+  /**
+   * @deprecated will be replaced w/ {@link #getRealtimeSplits(Configuration, List)}
+   */
   // get IncrementalRealtimeSplits
-  public static InputSplit[] getIncrementalRealtimeSplits(Configuration conf, Stream<FileSplit> fileSplits) throws IOException {
+  public static InputSplit[] getIncrementalRealtimeSplits(Configuration conf, List<FileSplit> fileSplits) throws IOException {
+    checkState(fileSplits.stream().allMatch(HoodieRealtimeInputFormatUtils::doesBelongToIncrementalQuery),
+        "All splits have to belong to incremental query");
+
     List<InputSplit> rtSplits = new ArrayList<>();
-    List<FileSplit> fileSplitList = fileSplits.collect(Collectors.toList());
-    Set<Path> partitionSet = fileSplitList.stream().map(f -> f.getPath().getParent()).collect(Collectors.toSet());
+    Set<Path> partitionSet = fileSplits.stream().map(f -> f.getPath().getParent()).collect(Collectors.toSet());
     Map<Path, HoodieTableMetaClient> partitionsToMetaClient = getTableMetaClientByPartitionPath(conf, partitionSet);
     // Pre process tableConfig from first partition to fetch virtual key info
     Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo = Option.empty();
     if (partitionSet.size() > 0) {
       hoodieVirtualKeyInfo = getHoodieVirtualKeyInfo(partitionsToMetaClient.get(partitionSet.iterator().next()));
     }
     Option<HoodieVirtualKeyInfo> finalHoodieVirtualKeyInfo = hoodieVirtualKeyInfo;
-    fileSplitList.stream().forEach(s -> {
+    fileSplits.stream().forEach(s -> {
       // deal with incremental query.
       try {
         if (s instanceof BaseFileWithLogsSplit) {
-          BaseFileWithLogsSplit bs = (BaseFileWithLogsSplit)s;
-          if (bs.getBelongToIncrementalSplit()) {
-            rtSplits.add(new HoodieRealtimeFileSplit(bs, bs.getBasePath(), bs.getDeltaLogFiles(), bs.getMaxCommitTime(), finalHoodieVirtualKeyInfo));
-          }
+          BaseFileWithLogsSplit bs = unsafeCast(s);
+          rtSplits.add(new HoodieRealtimeFileSplit(bs, bs.getBasePath(), bs.getDeltaLogFiles(), bs.getMaxCommitTime(), finalHoodieVirtualKeyInfo));
         } else if (s instanceof RealtimeBootstrapBaseFileSplit) {
-          rtSplits.add(s);
+          RealtimeBootstrapBaseFileSplit bs = unsafeCast(s);

Review comment:
       Is type cast redundant here?

##########
File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
##########
@@ -361,15 +386,19 @@ void testMultiRollbackWithDeltaAndCompactionCommit() throws Exception {
         copyOfRecords = dataGen.generateUpdates(newCommitTime, copyOfRecords);
         copyOfRecords.addAll(dataGen.generateInserts(newCommitTime, 200));
 
-        List<String> dataFiles = tableView.getLatestBaseFiles().map(hf -> hf.getPath()).collect(Collectors.toList());
+        List<String> dataFiles = tableView.getLatestBaseFiles()
+            .map(baseFile -> new Path(baseFile.getPath()).getParent().toString())
+            .collect(Collectors.toList());
         List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles,
             basePath());
         assertEquals(200, recordsRead.size());
 
         statuses = nClient.upsert(jsc().parallelize(copyOfRecords, 1), newCommitTime).collect();
         // Verify there are no errors
         assertNoWriteErrors(statuses);
-        nClient.commit(newCommitTime, writeStatusJavaRDD);
+
+        nClient.commit(newCommitTime, jsc().parallelize(statuses));
+

Review comment:
       Let's revert the cosmetic changes in this PR to make review easier.  You can put up a separate PR for cleaning the code up.

##########
File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
##########
@@ -516,8 +556,6 @@ void testMORTableRestore(boolean restoreAfterCompaction) throws Exception {
     JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
     JavaRDD<WriteStatus> writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime);
     client.commit(newCommitTime, writeStatusJavaRDD);
-    List<WriteStatus> statuses = writeStatusJavaRDD.collect();
-    assertNoWriteErrors(statuses);

Review comment:
       Still want to keep the check?

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
##########
@@ -440,6 +437,9 @@ public static HoodieMetadataConfig buildMetadataConfig(Configuration conf) {
         .build();
   }
 
+  /**
+   * @deprecated
+   */

Review comment:
       Should be an actual annotation instead?

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##########
@@ -65,11 +65,71 @@
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.apache.hudi.TypeUtils.unsafeCast;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+
 public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
 
   private static final Logger LOG = LogManager.getLogger(HoodieRealtimeInputFormatUtils.class);
 
-  public static InputSplit[] getRealtimeSplits(Configuration conf, Stream<FileSplit> fileSplits) {
+  public static InputSplit[] getRealtimeSplits(Configuration conf, List<FileSplit> fileSplits) throws IOException {
+    if (fileSplits.isEmpty()) {
+      return new InputSplit[0];
+    }
+
+    FileSplit fileSplit = fileSplits.get(0);
+
+    // Pre-process table-config to fetch virtual key info
+    Path partitionPath = fileSplit.getPath().getParent();
+    HoodieTableMetaClient metaClient = getTableMetaClientForBasePathUnchecked(conf, partitionPath);
+
+    Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfoOpt = getHoodieVirtualKeyInfo(metaClient);
+
+    // NOTE: This timeline is kept in sync w/ {@code HoodieTableFileIndexBase}
+    HoodieInstant latestCommitInstant =
+        metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get();
+
+    InputSplit[] finalSplits = fileSplits.stream()
+      .map(split -> {
+        // There are 4 types of splits could we have to handle here
+        //    - {@code BootstrapBaseFileSplit}: in case base file does have associated bootstrap file,
+        //      but does NOT have any log files appended (convert it to {@code RealtimeBootstrapBaseFileSplit})
+        //    - {@code RealtimeBootstrapBaseFileSplit}: in case base file does have associated bootstrap file
+        //      and does have log files appended
+        //    - {@code BaseFileWithLogsSplit}: in case base file does NOT have associated bootstrap file
+        //       and does have log files appended;
+        //    - {@code FileSplit}: in case Hive passed down non-Hudi path
+        //
+        if (split instanceof RealtimeBootstrapBaseFileSplit) {
+          return split;
+        } else if (split instanceof BootstrapBaseFileSplit) {
+          BootstrapBaseFileSplit bootstrapBaseFileSplit = unsafeCast(split);
+          return createRealtimeBoostrapBaseFileSplit(
+              bootstrapBaseFileSplit,
+              metaClient.getBasePath(),
+              Collections.emptyList(),
+              latestCommitInstant.getTimestamp(),
+              false);
+        } else if (split instanceof BaseFileWithLogsSplit) {
+          BaseFileWithLogsSplit baseFileWithLogsSplit = unsafeCast(split);
+          return createHoodieRealtimeSplitUnchecked(baseFileWithLogsSplit, hoodieVirtualKeyInfoOpt);
+        } else {
+          // Non-Hudi paths might result in just generic {@code FileSplit} being
+          // propagated up to this point
+          return split;
+        }
+      })
+      .toArray(InputSplit[]::new);
+
+    LOG.info("Returning a total splits of " + finalSplits.length);
+
+    return finalSplits;
+  }
+
+  /**
+   * @deprecated
+   */

Review comment:
       Similar here.

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##########
@@ -65,11 +65,71 @@
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.apache.hudi.TypeUtils.unsafeCast;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+
 public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
 
   private static final Logger LOG = LogManager.getLogger(HoodieRealtimeInputFormatUtils.class);
 
-  public static InputSplit[] getRealtimeSplits(Configuration conf, Stream<FileSplit> fileSplits) {
+  public static InputSplit[] getRealtimeSplits(Configuration conf, List<FileSplit> fileSplits) throws IOException {
+    if (fileSplits.isEmpty()) {
+      return new InputSplit[0];
+    }
+
+    FileSplit fileSplit = fileSplits.get(0);
+
+    // Pre-process table-config to fetch virtual key info
+    Path partitionPath = fileSplit.getPath().getParent();
+    HoodieTableMetaClient metaClient = getTableMetaClientForBasePathUnchecked(conf, partitionPath);
+
+    Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfoOpt = getHoodieVirtualKeyInfo(metaClient);
+
+    // NOTE: This timeline is kept in sync w/ {@code HoodieTableFileIndexBase}
+    HoodieInstant latestCommitInstant =
+        metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get();
+
+    InputSplit[] finalSplits = fileSplits.stream()
+      .map(split -> {
+        // There are 4 types of splits could we have to handle here
+        //    - {@code BootstrapBaseFileSplit}: in case base file does have associated bootstrap file,
+        //      but does NOT have any log files appended (convert it to {@code RealtimeBootstrapBaseFileSplit})
+        //    - {@code RealtimeBootstrapBaseFileSplit}: in case base file does have associated bootstrap file
+        //      and does have log files appended
+        //    - {@code BaseFileWithLogsSplit}: in case base file does NOT have associated bootstrap file

Review comment:
       Should this scenario be considered here: base file does not have associated bootstrap file and does not have updates to it (i.e., no log files appended)?

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeBootstrapBaseFileSplit.java
##########
@@ -43,17 +43,20 @@
 
   private String basePath;
 
-  public RealtimeBootstrapBaseFileSplit() {
-    super();
-  }

Review comment:
       Similar here.  Wondering what is the reason of keeping such empty-arg constructors in the first place, so maybe we should not remove them?

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java
##########
@@ -143,6 +121,127 @@ private static FileStatus getFileStatusUnchecked(Option<HoodieBaseFile> baseFile
     return returns.toArray(new FileStatus[0]);
   }
 
+  private void validate(List<FileStatus> targetFiles, List<FileStatus> legacyFileStatuses) {
+    List<FileStatus> diff = CollectionUtils.diff(targetFiles, legacyFileStatuses);
+    checkState(diff.isEmpty(), "Should be empty");
+  }
+
+  @Nonnull
+  private static FileStatus getFileStatusUnchecked(HoodieBaseFile baseFile) {
+    try {
+      return HoodieInputFormatUtils.getFileStatus(baseFile);
+    } catch (IOException ioe) {
+      throw new HoodieIOException("Failed to get file-status", ioe);
+    }
+  }
+
+  /**
+   * Abstracts and exposes {@link FileInputFormat#listStatus(JobConf)} operation to subclasses that
+   * lists files (returning an array of {@link FileStatus}) corresponding to the input paths specified
+   * as part of provided {@link JobConf}
+   */
+  protected final FileStatus[] doListStatus(JobConf job) throws IOException {
+    return super.listStatus(job);
+  }
+
+  /**
+   * Achieves listStatus functionality for an incrementally queried table. Instead of listing all
+   * partitions and then filtering based on the commits of interest, this logic first extracts the
+   * partitions touched by the desired commits and then lists only those partitions.
+   */
+  protected List<FileStatus> listStatusForIncrementalMode(JobConf job,
+                                                          HoodieTableMetaClient tableMetaClient,
+                                                          List<Path> inputPaths,
+                                                          String incrementalTable) throws IOException {
+    String tableName = tableMetaClient.getTableConfig().getTableName();
+    Job jobContext = Job.getInstance(job);
+    Option<HoodieTimeline> timeline = HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
+    if (!timeline.isPresent()) {
+      return null;
+    }
+    Option<List<HoodieInstant>> commitsToCheck = HoodieInputFormatUtils.getCommitsForIncrementalQuery(jobContext, tableName, timeline.get());
+    if (!commitsToCheck.isPresent()) {
+      return null;
+    }
+    Option<String> incrementalInputPaths = HoodieInputFormatUtils.getAffectedPartitions(commitsToCheck.get(), tableMetaClient, timeline.get(), inputPaths);
+    // Mutate the JobConf to set the input paths to only partitions touched by incremental pull.
+    if (!incrementalInputPaths.isPresent()) {
+      return null;
+    }
+    setInputPaths(job, incrementalInputPaths.get());
+    FileStatus[] fileStatuses = doListStatus(job);
+    return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext, tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get());
+  }
+
+  protected abstract boolean includeLogFilesForSnapshotView();
+
+  @Nonnull
+  private static RealtimeFileStatus createRealtimeFileStatusUnchecked(HoodieBaseFile baseFile,

Review comment:
       It looks like this is newly added.  Add docs to explain the purpose?

##########
File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
##########
@@ -189,8 +190,10 @@ public void testUpsertPartitioner(boolean populateMetaFields) throws Exception {
 
       assertTrue(fileIdToNewSize.entrySet().stream().anyMatch(entry -> fileIdToSize.get(entry.getKey()) < entry.getValue()));
 
-      List<String> dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
-      List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles,
+      List<String> inputPaths = roView.getLatestBaseFiles()
+          .map(baseFile -> new Path(baseFile.getPath()).getParent().toString())
+          .collect(Collectors.toList());

Review comment:
       Looking at this again, are these changes and similar ones in other test classes legit?  Basically, the list of results is changed from base file paths to partition paths.

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##########
@@ -65,11 +65,71 @@
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.apache.hudi.TypeUtils.unsafeCast;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+
 public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
 
   private static final Logger LOG = LogManager.getLogger(HoodieRealtimeInputFormatUtils.class);
 
-  public static InputSplit[] getRealtimeSplits(Configuration conf, Stream<FileSplit> fileSplits) {
+  public static InputSplit[] getRealtimeSplits(Configuration conf, List<FileSplit> fileSplits) throws IOException {

Review comment:
       Any reason of sticking to the return type of array instead of List (for all getting-splits methods)?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org