You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "bvaradar (via GitHub)" <gi...@apache.org> on 2023/04/02 03:22:33 UTC

[GitHub] [hudi] bvaradar commented on a diff in pull request #7143: [HUDI-5175] Improving FileIndex load performance in PARALLELISM mode

bvaradar commented on code in PR #7143:
URL: https://github.com/apache/hudi/pull/7143#discussion_r1155230011


##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -229,17 +238,93 @@ protected List<FileSlice> getInputFileSlices(PartitionPath partition) {
   }
 
   private Map<PartitionPath, List<FileSlice>> loadFileSlicesForPartitions(List<PartitionPath> partitions) {
-    FileStatus[] allFiles = listPartitionPathFiles(partitions);
+    Pair<Map<Path, FileStatus[]>, Map<String, FileStatus[]>> partitionFilesPair = listPartitionPathFiles(partitions);
     HoodieTimeline activeTimeline = getActiveTimeline();
     Option<HoodieInstant> latestInstant = activeTimeline.lastInstant();
 
-    HoodieTableFileSystemView fileSystemView =
-        new HoodieTableFileSystemView(metaClient, activeTimeline, allFiles);
-
     Option<String> queryInstant = specifiedQueryInstant.or(() -> latestInstant.map(HoodieInstant::getTimestamp));
 
     validate(activeTimeline, queryInstant);
 
+    int parallelism = Integer.parseInt(String.valueOf(configProperties.getOrDefault(HoodieCommonConfig.TABLE_LOADING_PARALLELISM.key(),
+        HoodieCommonConfig.TABLE_LOADING_PARALLELISM.defaultValue())));

Review Comment:
   By default, we need to disable this. Only after sufficient runway of having this code being used, we need to enable parallelism. 



##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -229,17 +238,93 @@ protected List<FileSlice> getInputFileSlices(PartitionPath partition) {
   }
 
   private Map<PartitionPath, List<FileSlice>> loadFileSlicesForPartitions(List<PartitionPath> partitions) {
-    FileStatus[] allFiles = listPartitionPathFiles(partitions);
+    Pair<Map<Path, FileStatus[]>, Map<String, FileStatus[]>> partitionFilesPair = listPartitionPathFiles(partitions);
     HoodieTimeline activeTimeline = getActiveTimeline();
     Option<HoodieInstant> latestInstant = activeTimeline.lastInstant();
 
-    HoodieTableFileSystemView fileSystemView =
-        new HoodieTableFileSystemView(metaClient, activeTimeline, allFiles);
-
     Option<String> queryInstant = specifiedQueryInstant.or(() -> latestInstant.map(HoodieInstant::getTimestamp));
 
     validate(activeTimeline, queryInstant);
 
+    int parallelism = Integer.parseInt(String.valueOf(configProperties.getOrDefault(HoodieCommonConfig.TABLE_LOADING_PARALLELISM.key(),
+        HoodieCommonConfig.TABLE_LOADING_PARALLELISM.defaultValue())));
+
+    Map<PartitionPath, List<FileSlice>> cachedAllInputFileSlices;
+    long buildCacheFileSlicesLocalStart = System.currentTimeMillis();
+    if (parallelism > 0 && partitions.size() > 0) {
+
+      // convert Map<Path, FileStatus[]> to Map<String, FileStatus[]>
+      Map<String, FileStatus[]> left = partitionFilesPair.getLeft().entrySet().stream().map(entry -> {
+        String partitionPath = entry.getKey().toString();
+        FileStatus[] statuses = entry.getValue();
+        return Pair.of(partitionPath, statuses);
+      }).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+
+      Map<String, FileStatus[]> partitionFiles = combine(left, partitionFilesPair.getRight());
+
+      cachedAllInputFileSlices = buildCacheFileSlicesLocalParallel(parallelism, partitions, partitionFiles, activeTimeline, queryInstant);
+    } else {
+      FileStatus[] allFiles = combine(flatMap(partitionFilesPair.getLeft().values()), flatMap(partitionFilesPair.getRight().values()));
+      HoodieTableFileSystemView fileSystemView =
+          new HoodieTableFileSystemView(metaClient, activeTimeline, allFiles);
+
+      cachedAllInputFileSlices =  getCandidateFileSlices(partitions, queryInstant, fileSystemView);
+    }
+
+    long buildCacheFileSlicesLocalEnd = System.currentTimeMillis();
+    LOG.info(String.format("Build cache file slices, spent: %d ms", buildCacheFileSlicesLocalEnd - buildCacheFileSlicesLocalStart));
+
+    return cachedAllInputFileSlices;
+  }
+
+  private Map<PartitionPath, List<FileSlice>> buildCacheFileSlicesLocalParallel(int parallelism, List<PartitionPath> partitions, Map<String, FileStatus[]> partitionFiles,
+                                                                                HoodieTimeline activeTimeline, Option<String> queryInstant) {
+    HashMap<PartitionPath, List<FileSlice>> res = new HashMap<>();
+    parallelism = Math.max(1, Math.min(parallelism, partitionFiles.size()));
+    int totalPartitions = partitionFiles.size();
+    int cursor = 0;
+    int step = totalPartitions / parallelism;
+
+    ExecutorService pool =  Executors.newFixedThreadPool((parallelism + 1));
+    ArrayList<CompletableFuture<Map<PartitionPath, List<FileSlice>>>> futureList = new ArrayList<>(parallelism + 1);
+
+    while (cursor + step <= totalPartitions) {

Review Comment:
   Can we use simple Java Streams - parallelStream here to parallelize here instead of subdividing and then using parallel streams.  



##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -229,17 +238,93 @@ protected List<FileSlice> getInputFileSlices(PartitionPath partition) {
   }
 
   private Map<PartitionPath, List<FileSlice>> loadFileSlicesForPartitions(List<PartitionPath> partitions) {
-    FileStatus[] allFiles = listPartitionPathFiles(partitions);
+    Pair<Map<Path, FileStatus[]>, Map<String, FileStatus[]>> partitionFilesPair = listPartitionPathFiles(partitions);
     HoodieTimeline activeTimeline = getActiveTimeline();
     Option<HoodieInstant> latestInstant = activeTimeline.lastInstant();
 
-    HoodieTableFileSystemView fileSystemView =
-        new HoodieTableFileSystemView(metaClient, activeTimeline, allFiles);
-
     Option<String> queryInstant = specifiedQueryInstant.or(() -> latestInstant.map(HoodieInstant::getTimestamp));
 
     validate(activeTimeline, queryInstant);
 
+    int parallelism = Integer.parseInt(String.valueOf(configProperties.getOrDefault(HoodieCommonConfig.TABLE_LOADING_PARALLELISM.key(),
+        HoodieCommonConfig.TABLE_LOADING_PARALLELISM.defaultValue())));
+
+    Map<PartitionPath, List<FileSlice>> cachedAllInputFileSlices;

Review Comment:
   Rename to allCachedInputFileSlices



##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -229,17 +238,93 @@ protected List<FileSlice> getInputFileSlices(PartitionPath partition) {
   }
 
   private Map<PartitionPath, List<FileSlice>> loadFileSlicesForPartitions(List<PartitionPath> partitions) {
-    FileStatus[] allFiles = listPartitionPathFiles(partitions);
+    Pair<Map<Path, FileStatus[]>, Map<String, FileStatus[]>> partitionFilesPair = listPartitionPathFiles(partitions);
     HoodieTimeline activeTimeline = getActiveTimeline();
     Option<HoodieInstant> latestInstant = activeTimeline.lastInstant();
 
-    HoodieTableFileSystemView fileSystemView =
-        new HoodieTableFileSystemView(metaClient, activeTimeline, allFiles);
-
     Option<String> queryInstant = specifiedQueryInstant.or(() -> latestInstant.map(HoodieInstant::getTimestamp));
 
     validate(activeTimeline, queryInstant);
 
+    int parallelism = Integer.parseInt(String.valueOf(configProperties.getOrDefault(HoodieCommonConfig.TABLE_LOADING_PARALLELISM.key(),
+        HoodieCommonConfig.TABLE_LOADING_PARALLELISM.defaultValue())));
+
+    Map<PartitionPath, List<FileSlice>> cachedAllInputFileSlices;
+    long buildCacheFileSlicesLocalStart = System.currentTimeMillis();
+    if (parallelism > 0 && partitions.size() > 0) {
+
+      // convert Map<Path, FileStatus[]> to Map<String, FileStatus[]>
+      Map<String, FileStatus[]> left = partitionFilesPair.getLeft().entrySet().stream().map(entry -> {

Review Comment:
   Wouldn't it be simpler to have return type of listPartitionPathFiles use String instead of Path for partitionPath. We can avoid these transformations.



##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -229,17 +238,93 @@ protected List<FileSlice> getInputFileSlices(PartitionPath partition) {
   }
 
   private Map<PartitionPath, List<FileSlice>> loadFileSlicesForPartitions(List<PartitionPath> partitions) {
-    FileStatus[] allFiles = listPartitionPathFiles(partitions);
+    Pair<Map<Path, FileStatus[]>, Map<String, FileStatus[]>> partitionFilesPair = listPartitionPathFiles(partitions);

Review Comment:
   From the code change, it does not look like you need to differentiate cached and fetched file statues. Can you keep them as one return value. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org