You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/02/15 08:36:55 UTC
[flink-table-store] branch master updated: [hotfix] Commit operation should not create specific manifest file for new changes
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new db7f3c5 [hotfix] Commit operation should not create specific manifest file for new changes
db7f3c5 is described below
commit db7f3c56d08aa2b1faca009de43df1c3340f554c
Author: tsreaper <ts...@gmail.com>
AuthorDate: Tue Feb 15 16:36:49 2022 +0800
[hotfix] Commit operation should not create specific manifest file for new changes
This closes #21
---
.../store/file/manifest/ManifestFileMeta.java | 106 +++++++++++----------
.../store/file/operation/FileStoreCommitImpl.java | 15 ++-
.../store/file/manifest/ManifestFileMetaTest.java | 24 +++--
3 files changed, 78 insertions(+), 67 deletions(-)
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
index 0c4aa8f..cc883d3 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
@@ -27,6 +27,7 @@ import org.apache.flink.util.Preconditions;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -123,13 +124,17 @@ public class ManifestFileMeta {
}
/**
- * Merge several {@link ManifestFileMeta}s. {@link ManifestEntry}s representing first adding and
- * then deleting the same sst file will cancel each other.
+ * Merge several {@link ManifestFileMeta}s with several {@link ManifestEntry}s. {@link
+ * ManifestEntry}s representing first adding and then deleting the same sst file will cancel
+ * each other.
*
* <p>NOTE: This method is atomic.
*/
public static List<ManifestFileMeta> merge(
- List<ManifestFileMeta> metas, ManifestFile manifestFile, long suggestedMetaSize) {
+ List<ManifestFileMeta> metas,
+ List<ManifestEntry> entries,
+ ManifestFile manifestFile,
+ long suggestedMetaSize) {
List<ManifestFileMeta> result = new ArrayList<>();
// these are the newly created manifest files, clean them up if exception occurs
List<ManifestFileMeta> newMetas = new ArrayList<>();
@@ -137,20 +142,29 @@ public class ManifestFileMeta {
long totalSize = 0;
try {
+ // merge existing manifests first
for (ManifestFileMeta manifest : metas) {
totalSize += manifest.fileSize;
candidate.add(manifest);
if (totalSize >= suggestedMetaSize) {
// reach suggested file size, perform merging and produce new file
- merge(candidate, manifestFile, result, newMetas);
+ ManifestFileMeta merged;
+ if (candidate.size() == 1) {
+ merged = candidate.get(0);
+ } else {
+ merged = mergeIntoOneFile(candidate, Collections.emptyList(), manifestFile);
+ newMetas.add(merged);
+ }
+ result.add(merged);
candidate.clear();
totalSize = 0;
}
}
- if (!candidate.isEmpty()) {
- // merge the last bit of metas
- merge(candidate, manifestFile, result, newMetas);
- }
+
+ // merge the last bit of metas with entries
+ ManifestFileMeta merged = mergeIntoOneFile(candidate, entries, manifestFile);
+ newMetas.add(merged);
+ result.add(merged);
} catch (Throwable e) {
// exception occurs, clean up and rethrow
for (ManifestFileMeta manifest : newMetas) {
@@ -162,54 +176,44 @@ public class ManifestFileMeta {
return result;
}
- private static void merge(
- List<ManifestFileMeta> metas,
- ManifestFile manifestFile,
- List<ManifestFileMeta> result,
- List<ManifestFileMeta> newMetas) {
- if (metas.size() > 1) {
- ManifestFileMeta newMeta = merge(metas, manifestFile);
- result.add(newMeta);
- newMetas.add(newMeta);
- } else {
- result.addAll(metas);
+ private static ManifestFileMeta mergeIntoOneFile(
+ List<ManifestFileMeta> metas, List<ManifestEntry> entries, ManifestFile manifestFile) {
+ Map<ManifestEntry.Identifier, ManifestEntry> map = new LinkedHashMap<>();
+ for (ManifestFileMeta manifest : metas) {
+ mergeEntries(manifestFile.read(manifest.fileName), map);
}
+ mergeEntries(entries, map);
+ return manifestFile.write(new ArrayList<>(map.values()));
}
- private static ManifestFileMeta merge(List<ManifestFileMeta> metas, ManifestFile manifestFile) {
- Preconditions.checkArgument(
- metas.size() > 1, "Number of ManifestFileMeta <= 1. This is a bug.");
-
- Map<ManifestEntry.Identifier, ManifestEntry> map = new LinkedHashMap<>();
- for (ManifestFileMeta manifest : metas) {
- for (ManifestEntry entry : manifestFile.read(manifest.fileName)) {
- ManifestEntry.Identifier identifier = entry.identifier();
- switch (entry.kind()) {
- case ADD:
- Preconditions.checkState(
- !map.containsKey(identifier),
- "Trying to add file %s which is already added. Manifest might be corrupted.",
- identifier);
+ private static void mergeEntries(
+ List<ManifestEntry> entries, Map<ManifestEntry.Identifier, ManifestEntry> map) {
+ for (ManifestEntry entry : entries) {
+ ManifestEntry.Identifier identifier = entry.identifier();
+ switch (entry.kind()) {
+ case ADD:
+ Preconditions.checkState(
+ !map.containsKey(identifier),
+ "Trying to add file %s which is already added. Manifest might be corrupted.",
+ identifier);
+ map.put(identifier, entry);
+ break;
+ case DELETE:
+ // each sst file will only be added once and deleted once,
+ // if we know that it is added before then both add and delete entry can be
+ // removed because there won't be further operations on this file,
+ // otherwise we have to keep the delete entry because the add entry must be
+ // in the previous manifest files
+ if (map.containsKey(identifier)) {
+ map.remove(identifier);
+ } else {
map.put(identifier, entry);
- break;
- case DELETE:
- // each sst file will only be added once and deleted once,
- // if we know that it is added before then both add and delete entry can be
- // removed because there won't be further operations on this file,
- // otherwise we have to keep the delete entry because the add entry must be
- // in the previous manifest files
- if (map.containsKey(identifier)) {
- map.remove(identifier);
- } else {
- map.put(identifier, entry);
- }
- break;
- default:
- throw new UnsupportedOperationException(
- "Unknown value kind " + entry.kind().name());
- }
+ }
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unknown value kind " + entry.kind().name());
}
}
- return manifestFile.write(new ArrayList<>(map.values()));
}
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
index 22eeec6..465c2ec 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
@@ -232,15 +232,14 @@ public class FileStoreCommitImpl implements FileStoreCommit {
if (latestSnapshot != null) {
// read all previous manifest files
oldMetas.addAll(manifestList.read(latestSnapshot.manifestList()));
- // merge manifest files
- newMetas.addAll(
- ManifestFileMeta.merge(
- oldMetas,
- manifestFile,
- fileStoreOptions.manifestSuggestedSize.getBytes()));
}
- // write all changes to manifest file
- newMetas.add(manifestFile.write(changes));
+ // merge manifest files with changes
+ newMetas.addAll(
+ ManifestFileMeta.merge(
+ oldMetas,
+ changes,
+ manifestFile,
+ fileStoreOptions.manifestSuggestedSize.getBytes()));
// prepare snapshot file
manifestListName = manifestList.write(newMetas);
newSnapshot =
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
index 8ea0d88..cced937 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
@@ -73,10 +73,11 @@ public class ManifestFileMetaTest {
@Test
public void testMerge() {
List<ManifestFileMeta> input = new ArrayList<>();
+ List<ManifestEntry> entries = new ArrayList<>();
List<ManifestFileMeta> expected = new ArrayList<>();
- createData(input, expected);
+ createData(input, entries, expected);
- List<ManifestFileMeta> actual = ManifestFileMeta.merge(input, manifestFile, 500);
+ List<ManifestFileMeta> actual = ManifestFileMeta.merge(input, entries, manifestFile, 500);
assertThat(actual).hasSameSizeAs(expected);
// these three manifest files are merged from the input
@@ -107,13 +108,14 @@ public class ManifestFileMetaTest {
FailingAtomicRenameFileSystem.setFailPossibility(10);
List<ManifestFileMeta> input = new ArrayList<>();
- createData(input, null);
+ List<ManifestEntry> entries = new ArrayList<>();
+ createData(input, entries, null);
ManifestFile failingManifestFile =
createManifestFile(
FailingAtomicRenameFileSystem.SCHEME + "://" + tempDir.toString());
try {
- ManifestFileMeta.merge(input, failingManifestFile, 500);
+ ManifestFileMeta.merge(input, entries, failingManifestFile, 500);
} catch (Throwable e) {
assertThat(e)
.hasRootCauseExactlyInstanceOf(
@@ -144,14 +146,17 @@ public class ManifestFileMetaTest {
.create();
}
- private void createData(List<ManifestFileMeta> input, List<ManifestFileMeta> expected) {
+ private void createData(
+ List<ManifestFileMeta> input,
+ List<ManifestEntry> entries,
+ List<ManifestFileMeta> expected) {
// suggested size 500
// file sizes:
// 200, 300, -- multiple files exactly the suggested size
// 100, 200, 300, -- multiple files exceeding the suggested size
// 500, -- single file exactly the suggested size
// 600, -- single file exceeding the suggested size
- // 100, 200 -- not enough sizes, but the last bit
+ // 100, 300 -- not enough sizes, but the last bit
input.add(makeManifest(makeEntry(true, "A"), makeEntry(true, "B")));
input.add(makeManifest(makeEntry(true, "C"), makeEntry(false, "B"), makeEntry(true, "D")));
@@ -178,7 +183,10 @@ public class ManifestFileMetaTest {
makeEntry(true, "L")));
input.add(makeManifest(makeEntry(true, "M")));
- input.add(makeManifest(makeEntry(false, "M"), makeEntry(true, "N")));
+ input.add(makeManifest(makeEntry(false, "M"), makeEntry(true, "N"), makeEntry(true, "O")));
+
+ entries.add(makeEntry(false, "O"));
+ entries.add(makeEntry(true, "P"));
if (expected == null) {
return;
@@ -189,7 +197,7 @@ public class ManifestFileMetaTest {
expected.add(makeManifest(makeEntry(false, "A"), makeEntry(true, "F")));
expected.add(input.get(5));
expected.add(input.get(6));
- expected.add(makeManifest(makeEntry(true, "N")));
+ expected.add(makeManifest(makeEntry(true, "N"), makeEntry(true, "P")));
}
private ManifestFileMeta makeManifest(ManifestEntry... entries) {