You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/02/03 01:56:57 UTC

[GitHub] [hudi] nsivabalan commented on a change in pull request #4556: [HUDI-3191] Removing duplicating file-listing process w/in Hive's MOR `FileInputFormat`s

nsivabalan commented on a change in pull request #4556:
URL: https://github.com/apache/hudi/pull/4556#discussion_r798155468



##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##########
@@ -65,11 +65,70 @@
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.apache.hudi.TypeUtils.unsafeCast;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+
 public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
 
   private static final Logger LOG = LogManager.getLogger(HoodieRealtimeInputFormatUtils.class);
 
-  public static InputSplit[] getRealtimeSplits(Configuration conf, Stream<FileSplit> fileSplits) {
+  public static InputSplit[] getRealtimeSplits(Configuration conf, List<FileSplit> fileSplits) throws IOException {
+    if (fileSplits.isEmpty()) {
+      return new InputSplit[0];
+    }
+
+    FileSplit fileSplit = fileSplits.get(0);
+
+    // Pre-process table-config to fetch virtual key info
+    Path partitionPath = fileSplit.getPath().getParent();
+    HoodieTableMetaClient metaClient = getTableMetaClientForBasePathUnchecked(conf, partitionPath);
+
+    Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfoOpt = getHoodieVirtualKeyInfo(metaClient);
+
+    // NOTE: This timeline is kept in sync w/ {@code HoodieTableFileIndexBase}
+    HoodieInstant latestCommitInstant =
+        metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get();
+
+    InputSplit[] finalSplits = fileSplits.stream()
+      .map(split -> {
+        // There are 4 types of splits could we have to handle here
+        //    - {@code BootstrapBaseFileSplit}: in case base file does have associated bootstrap file,
+        //      but does NOT have any log files appended (convert it to {@code RealtimeBootstrapBaseFileSplit})
+        //    - {@code RealtimeBootstrapBaseFileSplit}: in case base file does have associated bootstrap file
+        //      and does have log files appended
+        //    - {@code BaseFileWithLogsSplit}: in case base file does NOT have associated bootstrap file
+        //       and does have log files appended;
+        //    - {@code FileSplit}: in case Hive passed down non-Hudi path
+        if (split instanceof RealtimeBootstrapBaseFileSplit) {
+          return split;
+        } else if (split instanceof BootstrapBaseFileSplit) {
+          BootstrapBaseFileSplit bootstrapBaseFileSplit = unsafeCast(split);
+          return createRealtimeBoostrapBaseFileSplit(
+              bootstrapBaseFileSplit,
+              metaClient.getBasePath(),
+              Collections.emptyList(),
+              latestCommitInstant.getTimestamp(),
+              false);
+        } else if (split instanceof BaseFileWithLogsSplit) {
+          BaseFileWithLogsSplit baseFileWithLogsSplit = unsafeCast(split);

Review comment:
       does the maxCommitTime in baseFileSplit will be in sync with latestCommitInstant computed at L89. Prior to this patch, use the latestCommitInstant computed here, where as now, we just reuse the same thats comes from BaseFileWithLogsSplit. 
   Just wanted to confirm as these are new code to me. 
   

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##########
@@ -65,11 +65,70 @@
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.apache.hudi.TypeUtils.unsafeCast;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+
 public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
 
   private static final Logger LOG = LogManager.getLogger(HoodieRealtimeInputFormatUtils.class);
 
-  public static InputSplit[] getRealtimeSplits(Configuration conf, Stream<FileSplit> fileSplits) {
+  public static InputSplit[] getRealtimeSplits(Configuration conf, List<FileSplit> fileSplits) throws IOException {

Review comment:
       this refactoring makes total sense assuming each FileSplit will correspond to one FileSlice. and there won't be a case where multiple FileSplits can store info about a single FileSlice. 
   thanks for doing this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org