You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by cz...@apache.org on 2022/11/07 10:35:22 UTC
[flink-table-store] branch master updated: [FLINK-29916] Fix bug that Levels in Table Store may mistakenly ignore level 0 files when two files have the same sequence number
This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 95dd0c32 [FLINK-29916] Fix bug that Levels in Table Store may mistakenly ignore level 0 files when two files have the same sequence number
95dd0c32 is described below
commit 95dd0c32da31b6528e96988b21e01126f3c24b7d
Author: tsreaper <ts...@gmail.com>
AuthorDate: Mon Nov 7 18:35:16 2022 +0800
[FLINK-29916] Fix bug that Levels in Table Store may mistakenly ignore level 0 files when two files have the same sequence number
This closes #356.
---
.../flink/table/store/file/mergetree/Levels.java | 20 +++++++++++++++++++-
.../flink/table/store/file/mergetree/LevelsTest.java | 11 +++++++++--
2 files changed, 28 insertions(+), 3 deletions(-)
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Levels.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Levels.java
index 65c1b69f..f4e2087a 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Levels.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Levels.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.store.file.mergetree;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.file.io.DataFileMeta;
+import org.apache.flink.util.Preconditions;
import java.util.ArrayList;
import java.util.Comparator;
@@ -51,7 +52,19 @@ public class Levels {
inputFiles.stream().mapToInt(DataFileMeta::level).max().orElse(-1) + 1);
checkArgument(restoredMaxLevel > 1, "levels must be at least 2.");
this.level0 =
- new TreeSet<>(Comparator.comparing(DataFileMeta::maxSequenceNumber).reversed());
+ new TreeSet<>(
+ (a, b) -> {
+ if (a.maxSequenceNumber() != b.maxSequenceNumber()) {
+ // file with larger sequence number should be in front
+ return Long.compare(b.maxSequenceNumber(), a.maxSequenceNumber());
+ } else {
+ // When two or more jobs are writing the same merge tree, it is
+ // possible that multiple files have the same maxSequenceNumber. In
+ // this case we have to compare their file names so that files with
+ // same maxSequenceNumber won't be "de-duplicated" by the tree set.
+ return a.fileName().compareTo(b.fileName());
+ }
+ });
this.levels = new ArrayList<>();
for (int i = 1; i < restoredMaxLevel; i++) {
levels.add(SortedRun.empty());
@@ -62,6 +75,11 @@ public class Levels {
levelMap.computeIfAbsent(file.level(), level -> new ArrayList<>()).add(file);
}
levelMap.forEach((level, files) -> updateLevel(level, emptyList(), files));
+
+ Preconditions.checkState(
+ level0.size() + levels.stream().mapToInt(r -> r.files().size()).sum()
+ == inputFiles.size(),
+ "Number of files stored in Levels does not equal to the size of inputFiles. This is unexpected.");
}
public void addLevel0File(DataFileMeta file) {
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/LevelsTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/LevelsTest.java
index 03745beb..1dc3cbe2 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/LevelsTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/LevelsTest.java
@@ -26,6 +26,7 @@ import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
+import java.util.UUID;
import static org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactManagerTest.row;
import static org.assertj.core.api.Assertions.assertThat;
@@ -43,7 +44,6 @@ public class LevelsTest {
@Test
public void testNonEmptyHighestLevel0() {
-
Levels levels = new Levels(comparator, Arrays.asList(newFile(0), newFile(0)), 3);
assertThat(levels.nonEmptyHighestLevel()).isEqualTo(0);
}
@@ -61,7 +61,14 @@ public class LevelsTest {
assertThat(levels.nonEmptyHighestLevel()).isEqualTo(2);
}
+ @Test
+ public void testLevel0WithSameSequenceNumbers() {
+ Levels levels = new Levels(comparator, Arrays.asList(newFile(0), newFile(0)), 3);
+ assertThat(levels.allFiles()).hasSize(2);
+ }
+
public static DataFileMeta newFile(int level) {
- return new DataFileMeta("", 0, 1, row(0), row(0), null, null, 0, 1, 0, level);
+ return new DataFileMeta(
+ UUID.randomUUID().toString(), 0, 1, row(0), row(0), null, null, 0, 1, 0, level);
}
}