You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/12/01 07:02:48 UTC
(incubator-paimon) 14/46: [core] Supports asynchronous manifest merge (#2391)
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-0.6
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
commit 89eca4701cd899fe72df13aa35b98978bbfaab78
Author: Liwei Li <hi...@gmail.com>
AuthorDate: Mon Nov 27 10:24:55 2023 +0800
[core] Supports asynchronous manifest merge (#2391)
---
.../org/apache/paimon/manifest/ManifestEntry.java | 28 ++++++++++++++++++++++
.../apache/paimon/manifest/ManifestFileMeta.java | 13 +++-------
2 files changed, 31 insertions(+), 10 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
index bf62cf795..4ca944ed8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
@@ -25,6 +25,7 @@ import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TinyIntType;
import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.FileUtils;
import org.apache.paimon.utils.Preconditions;
import java.util.ArrayList;
@@ -33,6 +34,9 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
import static org.apache.paimon.utils.SerializationUtils.newBytesType;
@@ -118,6 +122,30 @@ public class ManifestEntry {
return map.values();
}
+ public static void mergeEntries(
+ ManifestFile manifestFile,
+ List<ManifestFileMeta> manifestFiles,
+ Map<Identifier, ManifestEntry> map) {
+ List<CompletableFuture<List<ManifestEntry>>> manifestReadFutures =
+ manifestFiles.stream()
+ .map(
+ manifestFileMeta ->
+ CompletableFuture.supplyAsync(
+ () ->
+ manifestFile.read(
+ manifestFileMeta.fileName()),
+ FileUtils.COMMON_IO_FORK_JOIN_POOL))
+ .collect(Collectors.toList());
+
+ try {
+ for (CompletableFuture<List<ManifestEntry>> taskResult : manifestReadFutures) {
+ mergeEntries(taskResult.get(), map);
+ }
+ } catch (ExecutionException | InterruptedException e) {
+ throw new RuntimeException("Failed to read manifest file.", e);
+ }
+ }
+
public static void mergeEntries(
Iterable<ManifestEntry> entries, Map<Identifier, ManifestEntry> map) {
for (ManifestEntry entry : entries) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
index e4e7f707b..b917e27fb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
@@ -215,9 +215,7 @@ public class ManifestFileMeta {
}
Map<Identifier, ManifestEntry> map = new LinkedHashMap<>();
- for (ManifestFileMeta manifest : candidates) {
- ManifestEntry.mergeEntries(manifestFile.read(manifest.fileName), map);
- }
+ ManifestEntry.mergeEntries(manifestFile, candidates, map);
if (!map.isEmpty()) {
List<ManifestFileMeta> merged = manifestFile.write(new ArrayList<>(map.values()));
result.addAll(merged);
@@ -272,9 +270,7 @@ public class ManifestFileMeta {
// 2.1. try to skip base files by partition filter
Map<Identifier, ManifestEntry> deltaMerged = new LinkedHashMap<>();
- for (ManifestFileMeta manifest : delta) {
- ManifestEntry.mergeEntries(manifestFile.read(manifest.fileName), deltaMerged);
- }
+ ManifestEntry.mergeEntries(manifestFile, delta, deltaMerged);
List<ManifestFileMeta> result = new ArrayList<>();
int j = 0;
@@ -338,10 +334,7 @@ public class ManifestFileMeta {
// 2.3. merge base files
- for (; j < base.size(); j++) {
- ManifestFileMeta manifestFileMeta = base.get(j);
- ManifestEntry.mergeEntries(manifestFile.read(manifestFileMeta.fileName), fullMerged);
- }
+ ManifestEntry.mergeEntries(manifestFile, base.subList(j, base.size()), fullMerged);
ManifestEntry.mergeEntries(deltaMerged.values(), fullMerged);
// 2.4. write new manifest files