You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/12/01 07:02:48 UTC

(incubator-paimon) 14/46: [core] Supports asynchronous manifest merge (#2391)

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.6
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git

commit 89eca4701cd899fe72df13aa35b98978bbfaab78
Author: Liwei Li <hi...@gmail.com>
AuthorDate: Mon Nov 27 10:24:55 2023 +0800

    [core] Supports asynchronous manifest merge (#2391)
---
 .../org/apache/paimon/manifest/ManifestEntry.java  | 28 ++++++++++++++++++++++
 .../apache/paimon/manifest/ManifestFileMeta.java   | 13 +++-------
 2 files changed, 31 insertions(+), 10 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
index bf62cf795..4ca944ed8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
@@ -25,6 +25,7 @@ import org.apache.paimon.types.IntType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.types.TinyIntType;
 import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.FileUtils;
 import org.apache.paimon.utils.Preconditions;
 
 import java.util.ArrayList;
@@ -33,6 +34,9 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
 
 import static org.apache.paimon.utils.SerializationUtils.newBytesType;
 
@@ -118,6 +122,30 @@ public class ManifestEntry {
         return map.values();
     }
 
+    public static void mergeEntries(
+            ManifestFile manifestFile,
+            List<ManifestFileMeta> manifestFiles,
+            Map<Identifier, ManifestEntry> map) {
+        List<CompletableFuture<List<ManifestEntry>>> manifestReadFutures =
+                manifestFiles.stream()
+                        .map(
+                                manifestFileMeta ->
+                                        CompletableFuture.supplyAsync(
+                                                () ->
+                                                        manifestFile.read(
+                                                                manifestFileMeta.fileName()),
+                                                FileUtils.COMMON_IO_FORK_JOIN_POOL))
+                        .collect(Collectors.toList());
+
+        try {
+            for (CompletableFuture<List<ManifestEntry>> taskResult : manifestReadFutures) {
+                mergeEntries(taskResult.get(), map);
+            }
+        } catch (ExecutionException | InterruptedException e) {
+            throw new RuntimeException("Failed to read manifest file.", e);
+        }
+    }
+
     public static void mergeEntries(
             Iterable<ManifestEntry> entries, Map<Identifier, ManifestEntry> map) {
         for (ManifestEntry entry : entries) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
index e4e7f707b..b917e27fb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
@@ -215,9 +215,7 @@ public class ManifestFileMeta {
         }
 
         Map<Identifier, ManifestEntry> map = new LinkedHashMap<>();
-        for (ManifestFileMeta manifest : candidates) {
-            ManifestEntry.mergeEntries(manifestFile.read(manifest.fileName), map);
-        }
+        ManifestEntry.mergeEntries(manifestFile, candidates, map);
         if (!map.isEmpty()) {
             List<ManifestFileMeta> merged = manifestFile.write(new ArrayList<>(map.values()));
             result.addAll(merged);
@@ -272,9 +270,7 @@ public class ManifestFileMeta {
         // 2.1. try to skip base files by partition filter
 
         Map<Identifier, ManifestEntry> deltaMerged = new LinkedHashMap<>();
-        for (ManifestFileMeta manifest : delta) {
-            ManifestEntry.mergeEntries(manifestFile.read(manifest.fileName), deltaMerged);
-        }
+        ManifestEntry.mergeEntries(manifestFile, delta, deltaMerged);
 
         List<ManifestFileMeta> result = new ArrayList<>();
         int j = 0;
@@ -338,10 +334,7 @@ public class ManifestFileMeta {
 
         // 2.3. merge base files
 
-        for (; j < base.size(); j++) {
-            ManifestFileMeta manifestFileMeta = base.get(j);
-            ManifestEntry.mergeEntries(manifestFile.read(manifestFileMeta.fileName), fullMerged);
-        }
+        ManifestEntry.mergeEntries(manifestFile, base.subList(j, base.size()), fullMerged);
         ManifestEntry.mergeEntries(deltaMerged.values(), fullMerged);
 
         // 2.4. write new manifest files