You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/11/15 19:39:04 UTC

[GitHub] [hudi] alexeykudinkin commented on a change in pull request #3986: [HUDI-2550][WIP] Expand File-Group candidates list for appending for MOR tables

alexeykudinkin commented on a change in pull request #3986:
URL: https://github.com/apache/hudi/pull/3986#discussion_r749626727



##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitPartitioner.java
##########
@@ -51,68 +52,70 @@ public SparkUpsertDeltaCommitPartitioner(WorkloadProfile profile, HoodieSparkEng
 
   @Override
   protected List<SmallFile> getSmallFiles(String partitionPath) {
-
-    // smallFiles only for partitionPath
-    List<SmallFile> smallFileLocations = new ArrayList<>();
-
     // Init here since this class (and member variables) might not have been initialized
     HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
 
-    // Find out all eligible small file slices
-    if (!commitTimeline.empty()) {
-      HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
-      // find smallest file in partition and append to it
-      List<FileSlice> allSmallFileSlices = new ArrayList<>();
-      // If we cannot index log files, then we choose the smallest parquet file in the partition and add inserts to
-      // it. Doing this overtime for a partition, we ensure that we handle small file issues
-      if (!table.getIndex().canIndexLogFiles()) {
-        // TODO : choose last N small files since there can be multiple small files written to a single partition
-        // by different spark partitions in a single batch
-        Option<FileSlice> smallFileSlice = Option.fromJavaOptional(table.getSliceView()
-            .getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), false)
-            .filter(
-                fileSlice -> fileSlice.getLogFiles().count() < 1 && fileSlice.getBaseFile().get().getFileSize() < config
-                    .getParquetSmallFileLimit())
-            .min((FileSlice left, FileSlice right) ->
-                left.getBaseFile().get().getFileSize() < right.getBaseFile().get().getFileSize() ? -1 : 1));
-        if (smallFileSlice.isPresent()) {
-          allSmallFileSlices.add(smallFileSlice.get());
-        }
+    if (commitTimeline.empty()) {
+      return Collections.emptyList();
+    }
+
+    HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
+
+    // Find out all eligible small file slices, looking for
+    // smallest file in the partition to append to
+    List<FileSlice> smallFileSlicesCandidates = pickSmallFileCandidates(partitionPath, latestCommitTime);
+    List<SmallFile> smallFileLocations = new ArrayList<>();
+
+    // Create SmallFiles from the eligible file slices
+    for (FileSlice smallFileSlice : smallFileSlicesCandidates) {
+      SmallFile sf = new SmallFile();
+      if (smallFileSlice.getBaseFile().isPresent()) {
+        // TODO : Move logic of file name, file id, base commit time handling inside file slice
+        String filename = smallFileSlice.getBaseFile().get().getFileName();
+        sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));
+        sf.sizeBytes = getTotalFileSize(smallFileSlice);
+        smallFileLocations.add(sf);
       } else {
-        // If we can index log files, we can add more inserts to log files for fileIds NOT including those under
-        // pending compaction
-        List<FileSlice> allFileSlices =
-            table.getSliceView().getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), false)
-                .collect(Collectors.toList());
-        for (FileSlice fileSlice : allFileSlices) {
-          if (isSmallFile(fileSlice)) {
-            allSmallFileSlices.add(fileSlice);
-          }
-        }
-      }
-      // Create SmallFiles from the eligible file slices
-      for (FileSlice smallFileSlice : allSmallFileSlices) {
-        SmallFile sf = new SmallFile();
-        if (smallFileSlice.getBaseFile().isPresent()) {
-          // TODO : Move logic of file name, file id, base commit time handling inside file slice
-          String filename = smallFileSlice.getBaseFile().get().getFileName();
-          sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));
-          sf.sizeBytes = getTotalFileSize(smallFileSlice);
-          smallFileLocations.add(sf);
-        } else {
-          HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get();
-          sf.location = new HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),
-              FSUtils.getFileIdFromLogPath(logFile.getPath()));
-          sf.sizeBytes = getTotalFileSize(smallFileSlice);
-          smallFileLocations.add(sf);
-        }
+        HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get();
+        sf.location = new HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),
+            FSUtils.getFileIdFromLogPath(logFile.getPath()));
+        sf.sizeBytes = getTotalFileSize(smallFileSlice);
+        smallFileLocations.add(sf);
       }
     }
     return smallFileLocations;
   }
 
+  @Nonnull
+  private List<FileSlice> pickSmallFileCandidates(String partitionPath, HoodieInstant latestCommitInstant) {
+    // If we can index log files, we can add more inserts to log files for fileIds NOT including those under
+    // pending compaction
+    if (table.getIndex().canIndexLogFiles()) {
+      return table.getSliceView()
+              .getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitInstant.getTimestamp(), false)
+              .filter(this::isSmallFile)
+              .collect(Collectors.toList());
+    }
+
+    // If we cannot index log files, then we choose the smallest parquet file in the partition and add inserts to
+    // it. Doing this overtime for a partition, we ensure that we handle small file issues
+    // TODO : choose last N small files since there can be multiple small files written to a single partition

Review comment:
       Good catch. Forgot to clean up

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitPartitioner.java
##########
@@ -51,68 +52,70 @@ public SparkUpsertDeltaCommitPartitioner(WorkloadProfile profile, HoodieSparkEng
 
   @Override
   protected List<SmallFile> getSmallFiles(String partitionPath) {
-
-    // smallFiles only for partitionPath
-    List<SmallFile> smallFileLocations = new ArrayList<>();
-
     // Init here since this class (and member variables) might not have been initialized
     HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
 
-    // Find out all eligible small file slices
-    if (!commitTimeline.empty()) {
-      HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
-      // find smallest file in partition and append to it
-      List<FileSlice> allSmallFileSlices = new ArrayList<>();
-      // If we cannot index log files, then we choose the smallest parquet file in the partition and add inserts to
-      // it. Doing this overtime for a partition, we ensure that we handle small file issues
-      if (!table.getIndex().canIndexLogFiles()) {
-        // TODO : choose last N small files since there can be multiple small files written to a single partition
-        // by different spark partitions in a single batch
-        Option<FileSlice> smallFileSlice = Option.fromJavaOptional(table.getSliceView()
-            .getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), false)
-            .filter(
-                fileSlice -> fileSlice.getLogFiles().count() < 1 && fileSlice.getBaseFile().get().getFileSize() < config
-                    .getParquetSmallFileLimit())
-            .min((FileSlice left, FileSlice right) ->
-                left.getBaseFile().get().getFileSize() < right.getBaseFile().get().getFileSize() ? -1 : 1));
-        if (smallFileSlice.isPresent()) {
-          allSmallFileSlices.add(smallFileSlice.get());
-        }
+    if (commitTimeline.empty()) {
+      return Collections.emptyList();
+    }
+
+    HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
+
+    // Find out all eligible small file slices, looking for
+    // smallest file in the partition to append to
+    List<FileSlice> smallFileSlicesCandidates = pickSmallFileCandidates(partitionPath, latestCommitTime);
+    List<SmallFile> smallFileLocations = new ArrayList<>();
+
+    // Create SmallFiles from the eligible file slices
+    for (FileSlice smallFileSlice : smallFileSlicesCandidates) {
+      SmallFile sf = new SmallFile();
+      if (smallFileSlice.getBaseFile().isPresent()) {
+        // TODO : Move logic of file name, file id, base commit time handling inside file slice
+        String filename = smallFileSlice.getBaseFile().get().getFileName();
+        sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));
+        sf.sizeBytes = getTotalFileSize(smallFileSlice);
+        smallFileLocations.add(sf);
       } else {
-        // If we can index log files, we can add more inserts to log files for fileIds NOT including those under
-        // pending compaction
-        List<FileSlice> allFileSlices =
-            table.getSliceView().getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), false)
-                .collect(Collectors.toList());
-        for (FileSlice fileSlice : allFileSlices) {
-          if (isSmallFile(fileSlice)) {
-            allSmallFileSlices.add(fileSlice);
-          }
-        }
-      }
-      // Create SmallFiles from the eligible file slices
-      for (FileSlice smallFileSlice : allSmallFileSlices) {
-        SmallFile sf = new SmallFile();
-        if (smallFileSlice.getBaseFile().isPresent()) {
-          // TODO : Move logic of file name, file id, base commit time handling inside file slice
-          String filename = smallFileSlice.getBaseFile().get().getFileName();
-          sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));
-          sf.sizeBytes = getTotalFileSize(smallFileSlice);
-          smallFileLocations.add(sf);
-        } else {
-          HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get();
-          sf.location = new HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),
-              FSUtils.getFileIdFromLogPath(logFile.getPath()));
-          sf.sizeBytes = getTotalFileSize(smallFileSlice);
-          smallFileLocations.add(sf);
-        }
+        HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get();
+        sf.location = new HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),
+            FSUtils.getFileIdFromLogPath(logFile.getPath()));
+        sf.sizeBytes = getTotalFileSize(smallFileSlice);
+        smallFileLocations.add(sf);
       }
     }
     return smallFileLocations;
   }
 
+  @Nonnull
+  private List<FileSlice> pickSmallFileCandidates(String partitionPath, HoodieInstant latestCommitInstant) {
+    // If we can index log files, we can add more inserts to log files for fileIds NOT including those under
+    // pending compaction
+    if (table.getIndex().canIndexLogFiles()) {
+      return table.getSliceView()
+              .getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitInstant.getTimestamp(), false)
+              .filter(this::isSmallFile)
+              .collect(Collectors.toList());
+    }
+

Review comment:
       I'm actually deliberately inverting those conditionals to drop some of the else blocks to reduce nesting, by facilitating early returns

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitPartitioner.java
##########
@@ -51,68 +52,70 @@ public SparkUpsertDeltaCommitPartitioner(WorkloadProfile profile, HoodieSparkEng
 
   @Override
   protected List<SmallFile> getSmallFiles(String partitionPath) {
-
-    // smallFiles only for partitionPath
-    List<SmallFile> smallFileLocations = new ArrayList<>();
-
     // Init here since this class (and member variables) might not have been initialized
     HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
 
-    // Find out all eligible small file slices
-    if (!commitTimeline.empty()) {
-      HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
-      // find smallest file in partition and append to it
-      List<FileSlice> allSmallFileSlices = new ArrayList<>();
-      // If we cannot index log files, then we choose the smallest parquet file in the partition and add inserts to
-      // it. Doing this overtime for a partition, we ensure that we handle small file issues
-      if (!table.getIndex().canIndexLogFiles()) {
-        // TODO : choose last N small files since there can be multiple small files written to a single partition
-        // by different spark partitions in a single batch
-        Option<FileSlice> smallFileSlice = Option.fromJavaOptional(table.getSliceView()
-            .getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), false)
-            .filter(
-                fileSlice -> fileSlice.getLogFiles().count() < 1 && fileSlice.getBaseFile().get().getFileSize() < config
-                    .getParquetSmallFileLimit())
-            .min((FileSlice left, FileSlice right) ->
-                left.getBaseFile().get().getFileSize() < right.getBaseFile().get().getFileSize() ? -1 : 1));
-        if (smallFileSlice.isPresent()) {
-          allSmallFileSlices.add(smallFileSlice.get());
-        }
+    if (commitTimeline.empty()) {
+      return Collections.emptyList();
+    }
+
+    HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
+
+    // Find out all eligible small file slices, looking for
+    // smallest file in the partition to append to
+    List<FileSlice> smallFileSlicesCandidates = pickSmallFileCandidates(partitionPath, latestCommitTime);
+    List<SmallFile> smallFileLocations = new ArrayList<>();
+
+    // Create SmallFiles from the eligible file slices
+    for (FileSlice smallFileSlice : smallFileSlicesCandidates) {
+      SmallFile sf = new SmallFile();
+      if (smallFileSlice.getBaseFile().isPresent()) {
+        // TODO : Move logic of file name, file id, base commit time handling inside file slice
+        String filename = smallFileSlice.getBaseFile().get().getFileName();
+        sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));
+        sf.sizeBytes = getTotalFileSize(smallFileSlice);
+        smallFileLocations.add(sf);
       } else {
-        // If we can index log files, we can add more inserts to log files for fileIds NOT including those under
-        // pending compaction
-        List<FileSlice> allFileSlices =
-            table.getSliceView().getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), false)
-                .collect(Collectors.toList());
-        for (FileSlice fileSlice : allFileSlices) {
-          if (isSmallFile(fileSlice)) {
-            allSmallFileSlices.add(fileSlice);
-          }
-        }
-      }
-      // Create SmallFiles from the eligible file slices
-      for (FileSlice smallFileSlice : allSmallFileSlices) {
-        SmallFile sf = new SmallFile();
-        if (smallFileSlice.getBaseFile().isPresent()) {
-          // TODO : Move logic of file name, file id, base commit time handling inside file slice
-          String filename = smallFileSlice.getBaseFile().get().getFileName();
-          sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));
-          sf.sizeBytes = getTotalFileSize(smallFileSlice);
-          smallFileLocations.add(sf);
-        } else {
-          HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get();
-          sf.location = new HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),
-              FSUtils.getFileIdFromLogPath(logFile.getPath()));
-          sf.sizeBytes = getTotalFileSize(smallFileSlice);
-          smallFileLocations.add(sf);
-        }
+        HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get();
+        sf.location = new HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),
+            FSUtils.getFileIdFromLogPath(logFile.getPath()));
+        sf.sizeBytes = getTotalFileSize(smallFileSlice);
+        smallFileLocations.add(sf);
       }
     }
     return smallFileLocations;
   }
 
+  @Nonnull
+  private List<FileSlice> pickSmallFileCandidates(String partitionPath, HoodieInstant latestCommitInstant) {
+    // If we can index log files, we can add more inserts to log files for fileIds NOT including those under
+    // pending compaction
+    if (table.getIndex().canIndexLogFiles()) {
+      return table.getSliceView()
+              .getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitInstant.getTimestamp(), false)
+              .filter(this::isSmallFile)
+              .collect(Collectors.toList());
+    }
+

Review comment:
       Here's [good example](https://softwareengineering.stackexchange.com/a/18473/326225) of how early returns streamline control flow




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org