You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/01/18 09:06:16 UTC

[flink-table-store] branch master updated: [FLINK-25687] DropDelete is incorrect in CompactManager when outputLevel is zero

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 632044f  [FLINK-25687] DropDelete is incorrect in CompactManager when outputLevel is zero
632044f is described below

commit 632044f56f7fa2c07e9be6050ace7d5fe68302f7
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Tue Jan 18 17:05:27 2022 +0800

    [FLINK-25687] DropDelete is incorrect in CompactManager when outputLevel is zero
    
    This closes #11
---
 .../store/file/mergetree/compact/CompactManager.java | 10 +++++++++-
 .../file/mergetree/compact/CompactStrategy.java      |  9 +++++++++
 .../file/mergetree/compact/CompactManagerTest.java   | 20 +++++++++++++-------
 3 files changed, 31 insertions(+), 8 deletions(-)

diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactManager.java
index f20bae4..4487d3f 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactManager.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactManager.java
@@ -74,8 +74,16 @@ public class CompactManager {
                             if (unit.files().size() < 2) {
                                 return;
                             }
+                            /*
+                             * As long as there is no older data, We can drop the deletion.
+                             * If the output level is 0, there may be older data not involved in compaction.
+                             * If the output level is bigger than 0, as long as there is no older data in
+                             * the current levels, the output is the oldest, so we can drop the deletion.
+                             * See CompactStrategy.pick.
+                             */
                             boolean dropDelete =
-                                    unit.outputLevel() >= levels.nonEmptyHighestLevel();
+                                    unit.outputLevel() != 0
+                                            && unit.outputLevel() >= levels.nonEmptyHighestLevel();
                             CompactTask task = new CompactTask(unit, dropDelete);
                             taskFuture = executor.submit(task);
                         });
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactStrategy.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactStrategy.java
index 27e66b6..a23dec0 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactStrategy.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactStrategy.java
@@ -26,5 +26,14 @@ import java.util.Optional;
 /** Compact strategy to decide which files to select for compaction. */
 public interface CompactStrategy {
 
+    /**
+     * Pick compaction unit from runs.
+     *
+     * <ul>
+     *   <li>compaction is runs-based, not file-based.
+     *   <li>level 0 is special, one run per file; all other levels are one run per level.
+     *   <li>compaction is sequential from small level to large level.
+     * </ul>
+     */
     Optional<CompactUnit> pick(int numLevels, List<LevelSortedRun> runs);
 }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CompactManagerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CompactManagerTest.java
index 560e298..49c3ca5 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CompactManagerTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CompactManagerTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.store.file.mergetree.compact;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.data.writer.BinaryRowWriter;
-import org.apache.flink.table.store.file.mergetree.LevelSortedRun;
 import org.apache.flink.table.store.file.mergetree.Levels;
 import org.apache.flink.table.store.file.mergetree.SortedRun;
 import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
@@ -63,6 +62,18 @@ public class CompactManagerTest {
     }
 
     @Test
+    public void testOutputToZeroLevel() throws ExecutionException, InterruptedException {
+        innerTest(
+                Arrays.asList(
+                        new LevelMinMax(0, 1, 3),
+                        new LevelMinMax(0, 1, 5),
+                        new LevelMinMax(0, 1, 8)),
+                Arrays.asList(new LevelMinMax(0, 1, 8), new LevelMinMax(0, 1, 3)),
+                (numLevels, runs) -> Optional.of(CompactUnit.fromLevelRuns(0, runs.subList(0, 2))),
+                false);
+    }
+
+    @Test
     public void testCompactToPenultimateLayer() throws ExecutionException, InterruptedException {
         innerTest(
                 Arrays.asList(
@@ -70,12 +81,7 @@ public class CompactManagerTest {
                         new LevelMinMax(0, 1, 5),
                         new LevelMinMax(2, 1, 7)),
                 Arrays.asList(new LevelMinMax(1, 1, 5), new LevelMinMax(2, 1, 7)),
-                new CompactStrategy() {
-                    @Override
-                    public Optional<CompactUnit> pick(int numLevels, List<LevelSortedRun> runs) {
-                        return Optional.of(CompactUnit.fromLevelRuns(1, runs.subList(0, 2)));
-                    }
-                },
+                (numLevels, runs) -> Optional.of(CompactUnit.fromLevelRuns(1, runs.subList(0, 2))),
                 false);
     }