You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/01/18 09:06:16 UTC
[flink-table-store] branch master updated: [FLINK-25687] DropDelete is incorrect in CompactManager when outputLevel is zero
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 632044f [FLINK-25687] DropDelete is incorrect in CompactManager when outputLevel is zero
632044f is described below
commit 632044f56f7fa2c07e9be6050ace7d5fe68302f7
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Tue Jan 18 17:05:27 2022 +0800
[FLINK-25687] DropDelete is incorrect in CompactManager when outputLevel is zero
This closes #11
---
.../store/file/mergetree/compact/CompactManager.java | 10 +++++++++-
.../file/mergetree/compact/CompactStrategy.java | 9 +++++++++
.../file/mergetree/compact/CompactManagerTest.java | 20 +++++++++++++-------
3 files changed, 31 insertions(+), 8 deletions(-)
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactManager.java
index f20bae4..4487d3f 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactManager.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactManager.java
@@ -74,8 +74,16 @@ public class CompactManager {
if (unit.files().size() < 2) {
return;
}
+ /*
+ * As long as there is no older data, We can drop the deletion.
+ * If the output level is 0, there may be older data not involved in compaction.
+ * If the output level is bigger than 0, as long as there is no older data in
+ * the current levels, the output is the oldest, so we can drop the deletion.
+ * See CompactStrategy.pick.
+ */
boolean dropDelete =
- unit.outputLevel() >= levels.nonEmptyHighestLevel();
+ unit.outputLevel() != 0
+ && unit.outputLevel() >= levels.nonEmptyHighestLevel();
CompactTask task = new CompactTask(unit, dropDelete);
taskFuture = executor.submit(task);
});
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactStrategy.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactStrategy.java
index 27e66b6..a23dec0 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactStrategy.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactStrategy.java
@@ -26,5 +26,14 @@ import java.util.Optional;
/** Compact strategy to decide which files to select for compaction. */
public interface CompactStrategy {
+ /**
+ * Pick compaction unit from runs.
+ *
+ * <ul>
+ * <li>compaction is runs-based, not file-based.
+ * <li>level 0 is special, one run per file; all other levels are one run per level.
+ * <li>compaction is sequential from small level to large level.
+ * </ul>
+ */
Optional<CompactUnit> pick(int numLevels, List<LevelSortedRun> runs);
}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CompactManagerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CompactManagerTest.java
index 560e298..49c3ca5 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CompactManagerTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CompactManagerTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.store.file.mergetree.compact;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.data.writer.BinaryRowWriter;
-import org.apache.flink.table.store.file.mergetree.LevelSortedRun;
import org.apache.flink.table.store.file.mergetree.Levels;
import org.apache.flink.table.store.file.mergetree.SortedRun;
import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
@@ -63,6 +62,18 @@ public class CompactManagerTest {
}
@Test
+ public void testOutputToZeroLevel() throws ExecutionException, InterruptedException {
+ innerTest(
+ Arrays.asList(
+ new LevelMinMax(0, 1, 3),
+ new LevelMinMax(0, 1, 5),
+ new LevelMinMax(0, 1, 8)),
+ Arrays.asList(new LevelMinMax(0, 1, 8), new LevelMinMax(0, 1, 3)),
+ (numLevels, runs) -> Optional.of(CompactUnit.fromLevelRuns(0, runs.subList(0, 2))),
+ false);
+ }
+
+ @Test
public void testCompactToPenultimateLayer() throws ExecutionException, InterruptedException {
innerTest(
Arrays.asList(
@@ -70,12 +81,7 @@ public class CompactManagerTest {
new LevelMinMax(0, 1, 5),
new LevelMinMax(2, 1, 7)),
Arrays.asList(new LevelMinMax(1, 1, 5), new LevelMinMax(2, 1, 7)),
- new CompactStrategy() {
- @Override
- public Optional<CompactUnit> pick(int numLevels, List<LevelSortedRun> runs) {
- return Optional.of(CompactUnit.fromLevelRuns(1, runs.subList(0, 2)));
- }
- },
+ (numLevels, runs) -> Optional.of(CompactUnit.fromLevelRuns(1, runs.subList(0, 2))),
false);
}