You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/02/15 08:36:55 UTC

[flink-table-store] branch master updated: [hotfix] Commit operation should not create specific manifest file for new changes

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new db7f3c5  [hotfix] Commit operation should not create specific manifest file for new changes
db7f3c5 is described below

commit db7f3c56d08aa2b1faca009de43df1c3340f554c
Author: tsreaper <ts...@gmail.com>
AuthorDate: Tue Feb 15 16:36:49 2022 +0800

    [hotfix] Commit operation should not create specific manifest file for new changes
    
    This closes #21
---
 .../store/file/manifest/ManifestFileMeta.java      | 106 +++++++++++----------
 .../store/file/operation/FileStoreCommitImpl.java  |  15 ++-
 .../store/file/manifest/ManifestFileMetaTest.java  |  24 +++--
 3 files changed, 78 insertions(+), 67 deletions(-)

diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
index 0c4aa8f..cc883d3 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
@@ -27,6 +27,7 @@ import org.apache.flink.util.Preconditions;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -123,13 +124,17 @@ public class ManifestFileMeta {
     }
 
     /**
-     * Merge several {@link ManifestFileMeta}s. {@link ManifestEntry}s representing first adding and
-     * then deleting the same sst file will cancel each other.
+     * Merge several {@link ManifestFileMeta}s with several {@link ManifestEntry}s. {@link
+     * ManifestEntry}s representing first adding and then deleting the same sst file will cancel
+     * each other.
      *
      * <p>NOTE: This method is atomic.
      */
     public static List<ManifestFileMeta> merge(
-            List<ManifestFileMeta> metas, ManifestFile manifestFile, long suggestedMetaSize) {
+            List<ManifestFileMeta> metas,
+            List<ManifestEntry> entries,
+            ManifestFile manifestFile,
+            long suggestedMetaSize) {
         List<ManifestFileMeta> result = new ArrayList<>();
         // these are the newly created manifest files, clean them up if exception occurs
         List<ManifestFileMeta> newMetas = new ArrayList<>();
@@ -137,20 +142,29 @@ public class ManifestFileMeta {
         long totalSize = 0;
 
         try {
+            // merge existing manifests first
             for (ManifestFileMeta manifest : metas) {
                 totalSize += manifest.fileSize;
                 candidate.add(manifest);
                 if (totalSize >= suggestedMetaSize) {
                     // reach suggested file size, perform merging and produce new file
-                    merge(candidate, manifestFile, result, newMetas);
+                    ManifestFileMeta merged;
+                    if (candidate.size() == 1) {
+                        merged = candidate.get(0);
+                    } else {
+                        merged = mergeIntoOneFile(candidate, Collections.emptyList(), manifestFile);
+                        newMetas.add(merged);
+                    }
+                    result.add(merged);
                     candidate.clear();
                     totalSize = 0;
                 }
             }
-            if (!candidate.isEmpty()) {
-                // merge the last bit of metas
-                merge(candidate, manifestFile, result, newMetas);
-            }
+
+            // merge the last bit of metas with entries
+            ManifestFileMeta merged = mergeIntoOneFile(candidate, entries, manifestFile);
+            newMetas.add(merged);
+            result.add(merged);
         } catch (Throwable e) {
             // exception occurs, clean up and rethrow
             for (ManifestFileMeta manifest : newMetas) {
@@ -162,54 +176,44 @@ public class ManifestFileMeta {
         return result;
     }
 
-    private static void merge(
-            List<ManifestFileMeta> metas,
-            ManifestFile manifestFile,
-            List<ManifestFileMeta> result,
-            List<ManifestFileMeta> newMetas) {
-        if (metas.size() > 1) {
-            ManifestFileMeta newMeta = merge(metas, manifestFile);
-            result.add(newMeta);
-            newMetas.add(newMeta);
-        } else {
-            result.addAll(metas);
+    private static ManifestFileMeta mergeIntoOneFile(
+            List<ManifestFileMeta> metas, List<ManifestEntry> entries, ManifestFile manifestFile) {
+        Map<ManifestEntry.Identifier, ManifestEntry> map = new LinkedHashMap<>();
+        for (ManifestFileMeta manifest : metas) {
+            mergeEntries(manifestFile.read(manifest.fileName), map);
         }
+        mergeEntries(entries, map);
+        return manifestFile.write(new ArrayList<>(map.values()));
     }
 
-    private static ManifestFileMeta merge(List<ManifestFileMeta> metas, ManifestFile manifestFile) {
-        Preconditions.checkArgument(
-                metas.size() > 1, "Number of ManifestFileMeta <= 1. This is a bug.");
-
-        Map<ManifestEntry.Identifier, ManifestEntry> map = new LinkedHashMap<>();
-        for (ManifestFileMeta manifest : metas) {
-            for (ManifestEntry entry : manifestFile.read(manifest.fileName)) {
-                ManifestEntry.Identifier identifier = entry.identifier();
-                switch (entry.kind()) {
-                    case ADD:
-                        Preconditions.checkState(
-                                !map.containsKey(identifier),
-                                "Trying to add file %s which is already added. Manifest might be corrupted.",
-                                identifier);
+    private static void mergeEntries(
+            List<ManifestEntry> entries, Map<ManifestEntry.Identifier, ManifestEntry> map) {
+        for (ManifestEntry entry : entries) {
+            ManifestEntry.Identifier identifier = entry.identifier();
+            switch (entry.kind()) {
+                case ADD:
+                    Preconditions.checkState(
+                            !map.containsKey(identifier),
+                            "Trying to add file %s which is already added. Manifest might be corrupted.",
+                            identifier);
+                    map.put(identifier, entry);
+                    break;
+                case DELETE:
+                    // each sst file will only be added once and deleted once,
+                    // if we know that it is added before then both add and delete entry can be
+                    // removed because there won't be further operations on this file,
+                    // otherwise we have to keep the delete entry because the add entry must be
+                    // in the previous manifest files
+                    if (map.containsKey(identifier)) {
+                        map.remove(identifier);
+                    } else {
                         map.put(identifier, entry);
-                        break;
-                    case DELETE:
-                        // each sst file will only be added once and deleted once,
-                        // if we know that it is added before then both add and delete entry can be
-                        // removed because there won't be further operations on this file,
-                        // otherwise we have to keep the delete entry because the add entry must be
-                        // in the previous manifest files
-                        if (map.containsKey(identifier)) {
-                            map.remove(identifier);
-                        } else {
-                            map.put(identifier, entry);
-                        }
-                        break;
-                    default:
-                        throw new UnsupportedOperationException(
-                                "Unknown value kind " + entry.kind().name());
-                }
+                    }
+                    break;
+                default:
+                    throw new UnsupportedOperationException(
+                            "Unknown value kind " + entry.kind().name());
             }
         }
-        return manifestFile.write(new ArrayList<>(map.values()));
     }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
index 22eeec6..465c2ec 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
@@ -232,15 +232,14 @@ public class FileStoreCommitImpl implements FileStoreCommit {
                 if (latestSnapshot != null) {
                     // read all previous manifest files
                     oldMetas.addAll(manifestList.read(latestSnapshot.manifestList()));
-                    // merge manifest files
-                    newMetas.addAll(
-                            ManifestFileMeta.merge(
-                                    oldMetas,
-                                    manifestFile,
-                                    fileStoreOptions.manifestSuggestedSize.getBytes()));
                 }
-                // write all changes to manifest file
-                newMetas.add(manifestFile.write(changes));
+                // merge manifest files with changes
+                newMetas.addAll(
+                        ManifestFileMeta.merge(
+                                oldMetas,
+                                changes,
+                                manifestFile,
+                                fileStoreOptions.manifestSuggestedSize.getBytes()));
                 // prepare snapshot file
                 manifestListName = manifestList.write(newMetas);
                 newSnapshot =
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
index 8ea0d88..cced937 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
@@ -73,10 +73,11 @@ public class ManifestFileMetaTest {
     @Test
     public void testMerge() {
         List<ManifestFileMeta> input = new ArrayList<>();
+        List<ManifestEntry> entries = new ArrayList<>();
         List<ManifestFileMeta> expected = new ArrayList<>();
-        createData(input, expected);
+        createData(input, entries, expected);
 
-        List<ManifestFileMeta> actual = ManifestFileMeta.merge(input, manifestFile, 500);
+        List<ManifestFileMeta> actual = ManifestFileMeta.merge(input, entries, manifestFile, 500);
         assertThat(actual).hasSameSizeAs(expected);
 
         // these three manifest files are merged from the input
@@ -107,13 +108,14 @@ public class ManifestFileMetaTest {
         FailingAtomicRenameFileSystem.setFailPossibility(10);
 
         List<ManifestFileMeta> input = new ArrayList<>();
-        createData(input, null);
+        List<ManifestEntry> entries = new ArrayList<>();
+        createData(input, entries, null);
         ManifestFile failingManifestFile =
                 createManifestFile(
                         FailingAtomicRenameFileSystem.SCHEME + "://" + tempDir.toString());
 
         try {
-            ManifestFileMeta.merge(input, failingManifestFile, 500);
+            ManifestFileMeta.merge(input, entries, failingManifestFile, 500);
         } catch (Throwable e) {
             assertThat(e)
                     .hasRootCauseExactlyInstanceOf(
@@ -144,14 +146,17 @@ public class ManifestFileMetaTest {
                 .create();
     }
 
-    private void createData(List<ManifestFileMeta> input, List<ManifestFileMeta> expected) {
+    private void createData(
+            List<ManifestFileMeta> input,
+            List<ManifestEntry> entries,
+            List<ManifestFileMeta> expected) {
         // suggested size 500
         // file sizes:
         // 200, 300, -- multiple files exactly the suggested size
         // 100, 200, 300, -- multiple files exceeding the suggested size
         // 500, -- single file exactly the suggested size
         // 600, -- single file exceeding the suggested size
-        // 100, 200 -- not enough sizes, but the last bit
+        // 100, 300 -- not enough sizes, but the last bit
 
         input.add(makeManifest(makeEntry(true, "A"), makeEntry(true, "B")));
         input.add(makeManifest(makeEntry(true, "C"), makeEntry(false, "B"), makeEntry(true, "D")));
@@ -178,7 +183,10 @@ public class ManifestFileMetaTest {
                         makeEntry(true, "L")));
 
         input.add(makeManifest(makeEntry(true, "M")));
-        input.add(makeManifest(makeEntry(false, "M"), makeEntry(true, "N")));
+        input.add(makeManifest(makeEntry(false, "M"), makeEntry(true, "N"), makeEntry(true, "O")));
+
+        entries.add(makeEntry(false, "O"));
+        entries.add(makeEntry(true, "P"));
 
         if (expected == null) {
             return;
@@ -189,7 +197,7 @@ public class ManifestFileMetaTest {
         expected.add(makeManifest(makeEntry(false, "A"), makeEntry(true, "F")));
         expected.add(input.get(5));
         expected.add(input.get(6));
-        expected.add(makeManifest(makeEntry(true, "N")));
+        expected.add(makeManifest(makeEntry(true, "N"), makeEntry(true, "P")));
     }
 
     private ManifestFileMeta makeManifest(ManifestEntry... entries) {