You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/20 10:20:55 UTC

[flink-table-store] branch release-0.2 updated: [FLINK-28482] num-sorted-run.stop-trigger introduced a unstable merging

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.2
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/release-0.2 by this push:
     new 95bb7a54 [FLINK-28482] num-sorted-run.stop-trigger introduced a unstable merging
95bb7a54 is described below

commit 95bb7a54d62e6b02492d33381cfb9d73ee2665fe
Author: Nicholas Jiang <pr...@163.com>
AuthorDate: Wed Jul 20 18:19:35 2022 +0800

    [FLINK-28482] num-sorted-run.stop-trigger introduced a unstable merging
    
    This closes #216
---
 .../shortcodes/generated/core_configuration.html   |  8 +++-
 .../org/apache/flink/table/store/CoreOptions.java  | 17 +++++++-
 .../mergetree/compact/UniversalCompaction.java     | 13 ++++--
 .../file/operation/KeyValueFileStoreWrite.java     |  3 +-
 .../table/store/file/mergetree/MergeTreeTest.java  |  3 +-
 .../mergetree/compact/UniversalCompactionTest.java | 49 +++++++++++++++++-----
 6 files changed, 75 insertions(+), 18 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index 09cdf87e..c985f08b 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -44,6 +44,12 @@
             <td>Integer</td>
             <td>The size amplification is defined as the amount (in percentage) of additional storage needed to store a single byte of data in the merge tree for changelog mode table.</td>
         </tr>
+        <tr>
+            <td><h5>compaction.max-sorted-run-num</h5></td>
+            <td style="word-wrap: break-word;">2147483647</td>
+            <td>Integer</td>
+            <td>The maximum sorted run number to pick for compaction. This value avoids merging too much sorted runs at the same time during compaction, which may lead to OutOfMemoryError.</td>
+        </tr>
         <tr>
             <td><h5>compaction.min.file-num</h5></td>
             <td style="word-wrap: break-word;">5</td>
@@ -150,7 +156,7 @@
             <td><h5>num-sorted-run.stop-trigger</h5></td>
             <td style="word-wrap: break-word;">10</td>
             <td>Integer</td>
-            <td>The number of sorted-runs that trigger the stopping of writes.</td>
+            <td>The number of sorted runs that trigger the stopping of writes.</td>
         </tr>
         <tr>
             <td><h5>page-size</h5></td>
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
index 3cc3ff27..8a7a5362 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
@@ -193,7 +193,7 @@ public class CoreOptions implements Serializable {
                     .intType()
                     .defaultValue(10)
                     .withDescription(
-                            "The number of sorted-runs that trigger the stopping of writes.");
+                            "The number of sorted runs that trigger the stopping of writes.");
 
     public static final ConfigOption<Integer> NUM_LEVELS =
             ConfigOptions.key("num-levels")
@@ -244,6 +244,15 @@ public class CoreOptions implements Serializable {
                                     + "for append-only table, even if sum(size(f_i)) < targetFileSize. This value "
                                     + "avoids pending too much small files, which slows down the performance.");
 
+    public static final ConfigOption<Integer> COMPACTION_MAX_SORTED_RUN_NUM =
+            ConfigOptions.key("compaction.max-sorted-run-num")
+                    .intType()
+                    .defaultValue(Integer.MAX_VALUE)
+                    .withDescription(
+                            "The maximum sorted run number to pick for compaction. "
+                                    + "This value avoids merging too much sorted runs at the same time during compaction, "
+                                    + "which may lead to OutOfMemoryError.");
+
     public static final ConfigOption<Boolean> CHANGELOG_FILE =
             ConfigOptions.key("changelog-file")
                     .booleanType()
@@ -405,7 +414,7 @@ public class CoreOptions implements Serializable {
         // By default, this ensures that the compaction does not fall to level 0, but at least to
         // level 1
         Integer numLevels = options.get(NUM_LEVELS);
-        numLevels = numLevels == null ? numSortedRunCompactionTrigger() + 1 : numLevels;
+        numLevels = numLevels == null ? numSortedRunStopTrigger() + 1 : numLevels;
         return numLevels;
     }
 
@@ -429,6 +438,10 @@ public class CoreOptions implements Serializable {
         return options.get(COMPACTION_MAX_FILE_NUM);
     }
 
+    public int maxSortedRunNum() {
+        return options.get(COMPACTION_MAX_SORTED_RUN_NUM);
+    }
+
     public boolean enableChangelogFile() {
         return options.get(CHANGELOG_FILE);
     }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java
index f4217431..ce5eac10 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java
@@ -43,11 +43,14 @@ public class UniversalCompaction implements CompactStrategy {
     private final int maxSizeAmp;
     private final int sizeRatio;
     private final int numRunCompactionTrigger;
+    private final int maxSortedRunNum;
 
-    public UniversalCompaction(int maxSizeAmp, int sizeRatio, int numRunCompactionTrigger) {
+    public UniversalCompaction(
+            int maxSizeAmp, int sizeRatio, int numRunCompactionTrigger, int maxSortedRunNum) {
         this.maxSizeAmp = maxSizeAmp;
         this.sizeRatio = sizeRatio;
         this.numRunCompactionTrigger = numRunCompactionTrigger;
+        this.maxSortedRunNum = maxSortedRunNum;
     }
 
     @Override
@@ -130,7 +133,7 @@ public class UniversalCompaction implements CompactStrategy {
         }
 
         if (candidateCount > 1) {
-            return createUnit(runs, maxLevel, candidateCount);
+            return createUnit(runs, maxLevel, candidateCount, maxSortedRunNum);
         }
 
         return null;
@@ -145,8 +148,12 @@ public class UniversalCompaction implements CompactStrategy {
     }
 
     @VisibleForTesting
-    static CompactUnit createUnit(List<LevelSortedRun> runs, int maxLevel, int runCount) {
+    static CompactUnit createUnit(
+            List<LevelSortedRun> runs, int maxLevel, int runCount, int maxSortedRunNum) {
         int outputLevel;
+        if (runCount > maxSortedRunNum) {
+            runCount = maxSortedRunNum;
+        }
         if (runCount == runs.size()) {
             outputLevel = maxLevel;
         } else {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
index 574c7eb0..e44f435a 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
@@ -140,7 +140,8 @@ public class KeyValueFileStoreWrite extends AbstractFileStoreWrite<KeyValue> {
                         new UniversalCompaction(
                                 options.maxSizeAmplificationPercent(),
                                 options.sortedRunSizeRatio(),
-                                options.numSortedRunCompactionTrigger()),
+                                options.numSortedRunCompactionTrigger(),
+                                options.maxSortedRunNum()),
                         compactExecutor,
                         levels),
                 levels,
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
index 8a5793d4..f1067545 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
@@ -292,7 +292,8 @@ public class MergeTreeTest {
                 new UniversalCompaction(
                         options.maxSizeAmplificationPercent(),
                         options.sortedRunSizeRatio(),
-                        options.numSortedRunCompactionTrigger());
+                        options.numSortedRunCompactionTrigger(),
+                        options.maxSortedRunNum());
         CompactRewriter rewriter =
                 (outputLevel, dropDelete, sections) ->
                         dataFileWriter.write(
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java
index a42ede9c..6a8b3793 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java
@@ -38,16 +38,21 @@ public class UniversalCompactionTest {
 
     @Test
     public void testOutputLevel() {
-        assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 1).outputLevel()).isEqualTo(0);
-        assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 2).outputLevel()).isEqualTo(0);
-        assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 3).outputLevel()).isEqualTo(2);
-        assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 4).outputLevel()).isEqualTo(3);
-        assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 5).outputLevel()).isEqualTo(5);
+        assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 1, Integer.MAX_VALUE).outputLevel())
+                .isEqualTo(0);
+        assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 2, Integer.MAX_VALUE).outputLevel())
+                .isEqualTo(0);
+        assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 3, Integer.MAX_VALUE).outputLevel())
+                .isEqualTo(2);
+        assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 4, Integer.MAX_VALUE).outputLevel())
+                .isEqualTo(3);
+        assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 5, Integer.MAX_VALUE).outputLevel())
+                .isEqualTo(5);
     }
 
     @Test
     public void testPick() {
-        UniversalCompaction compaction = new UniversalCompaction(25, 1, 3);
+        UniversalCompaction compaction = new UniversalCompaction(25, 1, 3, Integer.MAX_VALUE);
 
         // by size amplification
         Optional<CompactUnit> pick = compaction.pick(3, level0(1, 2, 3, 3));
@@ -69,9 +74,33 @@ public class UniversalCompactionTest {
         assertThat(results).isEqualTo(new long[] {1, 2, 3});
     }
 
+    @Test
+    public void testPickWithMaxSortedRunNum() {
+        UniversalCompaction compaction = new UniversalCompaction(25, 1, 3, 2);
+
+        // by size amplification
+        Optional<CompactUnit> pick = compaction.pick(3, level0(1, 2, 3, 3));
+        assertThat(pick.isPresent()).isTrue();
+        long[] results = pick.get().files().stream().mapToLong(DataFileMeta::fileSize).toArray();
+        assertThat(results).isEqualTo(new long[] {1, 2, 3, 3});
+
+        // by size ratio
+        pick = compaction.pick(3, level0(1, 1, 1, 50));
+        assertThat(pick.isPresent()).isTrue();
+        results = pick.get().files().stream().mapToLong(DataFileMeta::fileSize).toArray();
+        assertThat(results).isEqualTo(new long[] {1, 1});
+
+        // by file num
+        pick = compaction.pick(3, level0(1, 2, 3, 50));
+        assertThat(pick.isPresent()).isTrue();
+        results = pick.get().files().stream().mapToLong(DataFileMeta::fileSize).toArray();
+        // 3 should be in the candidate, by size ratio after picking by file num
+        assertThat(results).isEqualTo(new long[] {1, 2});
+    }
+
     @Test
     public void testSizeAmplification() {
-        UniversalCompaction compaction = new UniversalCompaction(25, 0, 1);
+        UniversalCompaction compaction = new UniversalCompaction(25, 0, 1, Integer.MAX_VALUE);
         long[] sizes = new long[] {1};
         sizes = appendAndPickForSizeAmp(compaction, sizes);
         assertThat(sizes).isEqualTo(new long[] {2});
@@ -111,7 +140,7 @@ public class UniversalCompactionTest {
 
     @Test
     public void testSizeRatio() {
-        UniversalCompaction compaction = new UniversalCompaction(25, 1, 5);
+        UniversalCompaction compaction = new UniversalCompaction(25, 1, 5, Integer.MAX_VALUE);
         long[] sizes = new long[] {1, 1, 1, 1};
         sizes = appendAndPickForSizeRatio(compaction, sizes);
         assertThat(sizes).isEqualTo(new long[] {5});
@@ -164,9 +193,9 @@ public class UniversalCompactionTest {
     @Test
     public void testSizeRatioThreshold() {
         long[] sizes = new long[] {8, 9, 10};
-        assertThat(pickForSizeRatio(new UniversalCompaction(25, 10, 2), sizes))
+        assertThat(pickForSizeRatio(new UniversalCompaction(25, 10, 2, Integer.MAX_VALUE), sizes))
                 .isEqualTo(new long[] {8, 9, 10});
-        assertThat(pickForSizeRatio(new UniversalCompaction(25, 20, 2), sizes))
+        assertThat(pickForSizeRatio(new UniversalCompaction(25, 20, 2, Integer.MAX_VALUE), sizes))
                 .isEqualTo(new long[] {27});
     }