You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by cz...@apache.org on 2022/11/07 10:35:22 UTC

[flink-table-store] branch master updated: [FLINK-29916] Fix bug that Levels in Table Store may mistakenly ignore level 0 files when two files have the same sequence number

This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 95dd0c32 [FLINK-29916] Fix bug that Levels in Table Store may mistakenly ignore level 0 files when two files have the same sequence number
95dd0c32 is described below

commit 95dd0c32da31b6528e96988b21e01126f3c24b7d
Author: tsreaper <ts...@gmail.com>
AuthorDate: Mon Nov 7 18:35:16 2022 +0800

    [FLINK-29916] Fix bug that Levels in Table Store may mistakenly ignore level 0 files when two files have the same sequence number
    
    This closes #356.
---
 .../flink/table/store/file/mergetree/Levels.java     | 20 +++++++++++++++++++-
 .../flink/table/store/file/mergetree/LevelsTest.java | 11 +++++++++--
 2 files changed, 28 insertions(+), 3 deletions(-)

diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Levels.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Levels.java
index 65c1b69f..f4e2087a 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Levels.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Levels.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.store.file.mergetree;
 
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.file.io.DataFileMeta;
+import org.apache.flink.util.Preconditions;
 
 import java.util.ArrayList;
 import java.util.Comparator;
@@ -51,7 +52,19 @@ public class Levels {
                         inputFiles.stream().mapToInt(DataFileMeta::level).max().orElse(-1) + 1);
         checkArgument(restoredMaxLevel > 1, "levels must be at least 2.");
         this.level0 =
-                new TreeSet<>(Comparator.comparing(DataFileMeta::maxSequenceNumber).reversed());
+                new TreeSet<>(
+                        (a, b) -> {
+                            if (a.maxSequenceNumber() != b.maxSequenceNumber()) {
+                                // file with larger sequence number should be in front
+                                return Long.compare(b.maxSequenceNumber(), a.maxSequenceNumber());
+                            } else {
+                                // When two or more jobs are writing the same merge tree, it is
+                                // possible that multiple files have the same maxSequenceNumber. In
+                                // this case we have to compare their file names so that files with
+                                // same maxSequenceNumber won't be "de-duplicated" by the tree set.
+                                return a.fileName().compareTo(b.fileName());
+                            }
+                        });
         this.levels = new ArrayList<>();
         for (int i = 1; i < restoredMaxLevel; i++) {
             levels.add(SortedRun.empty());
@@ -62,6 +75,11 @@ public class Levels {
             levelMap.computeIfAbsent(file.level(), level -> new ArrayList<>()).add(file);
         }
         levelMap.forEach((level, files) -> updateLevel(level, emptyList(), files));
+
+        Preconditions.checkState(
+                level0.size() + levels.stream().mapToInt(r -> r.files().size()).sum()
+                        == inputFiles.size(),
+                "Number of files stored in Levels does not equal to the size of inputFiles. This is unexpected.");
     }
 
     public void addLevel0File(DataFileMeta file) {
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/LevelsTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/LevelsTest.java
index 03745beb..1dc3cbe2 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/LevelsTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/LevelsTest.java
@@ -26,6 +26,7 @@ import org.junit.jupiter.api.Test;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.UUID;
 
 import static org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactManagerTest.row;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -43,7 +44,6 @@ public class LevelsTest {
 
     @Test
     public void testNonEmptyHighestLevel0() {
-
         Levels levels = new Levels(comparator, Arrays.asList(newFile(0), newFile(0)), 3);
         assertThat(levels.nonEmptyHighestLevel()).isEqualTo(0);
     }
@@ -61,7 +61,14 @@ public class LevelsTest {
         assertThat(levels.nonEmptyHighestLevel()).isEqualTo(2);
     }
 
+    @Test
+    public void testLevel0WithSameSequenceNumbers() {
+        Levels levels = new Levels(comparator, Arrays.asList(newFile(0), newFile(0)), 3);
+        assertThat(levels.allFiles()).hasSize(2);
+    }
+
     public static DataFileMeta newFile(int level) {
-        return new DataFileMeta("", 0, 1, row(0), row(0), null, null, 0, 1, 0, level);
+        return new DataFileMeta(
+                UUID.randomUUID().toString(), 0, 1, row(0), row(0), null, null, 0, 1, 0, level);
     }
 }