You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/20 10:20:55 UTC
[flink-table-store] branch release-0.2 updated: [FLINK-28482] num-sorted-run.stop-trigger introduced a unstable merging
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-0.2
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/release-0.2 by this push:
new 95bb7a54 [FLINK-28482] num-sorted-run.stop-trigger introduced a unstable merging
95bb7a54 is described below
commit 95bb7a54d62e6b02492d33381cfb9d73ee2665fe
Author: Nicholas Jiang <pr...@163.com>
AuthorDate: Wed Jul 20 18:19:35 2022 +0800
[FLINK-28482] num-sorted-run.stop-trigger introduced a unstable merging
This closes #216
---
.../shortcodes/generated/core_configuration.html | 8 +++-
.../org/apache/flink/table/store/CoreOptions.java | 17 +++++++-
.../mergetree/compact/UniversalCompaction.java | 13 ++++--
.../file/operation/KeyValueFileStoreWrite.java | 3 +-
.../table/store/file/mergetree/MergeTreeTest.java | 3 +-
.../mergetree/compact/UniversalCompactionTest.java | 49 +++++++++++++++++-----
6 files changed, 75 insertions(+), 18 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index 09cdf87e..c985f08b 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -44,6 +44,12 @@
<td>Integer</td>
<td>The size amplification is defined as the amount (in percentage) of additional storage needed to store a single byte of data in the merge tree for changelog mode table.</td>
</tr>
+ <tr>
+ <td><h5>compaction.max-sorted-run-num</h5></td>
+ <td style="word-wrap: break-word;">2147483647</td>
+ <td>Integer</td>
+ <td>The maximum sorted run number to pick for compaction. This value avoids merging too much sorted runs at the same time during compaction, which may lead to OutOfMemoryError.</td>
+ </tr>
<tr>
<td><h5>compaction.min.file-num</h5></td>
<td style="word-wrap: break-word;">5</td>
@@ -150,7 +156,7 @@
<td><h5>num-sorted-run.stop-trigger</h5></td>
<td style="word-wrap: break-word;">10</td>
<td>Integer</td>
- <td>The number of sorted-runs that trigger the stopping of writes.</td>
+ <td>The number of sorted runs that trigger the stopping of writes.</td>
</tr>
<tr>
<td><h5>page-size</h5></td>
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
index 3cc3ff27..8a7a5362 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
@@ -193,7 +193,7 @@ public class CoreOptions implements Serializable {
.intType()
.defaultValue(10)
.withDescription(
- "The number of sorted-runs that trigger the stopping of writes.");
+ "The number of sorted runs that trigger the stopping of writes.");
public static final ConfigOption<Integer> NUM_LEVELS =
ConfigOptions.key("num-levels")
@@ -244,6 +244,15 @@ public class CoreOptions implements Serializable {
+ "for append-only table, even if sum(size(f_i)) < targetFileSize. This value "
+ "avoids pending too much small files, which slows down the performance.");
+ public static final ConfigOption<Integer> COMPACTION_MAX_SORTED_RUN_NUM =
+ ConfigOptions.key("compaction.max-sorted-run-num")
+ .intType()
+ .defaultValue(Integer.MAX_VALUE)
+ .withDescription(
+ "The maximum sorted run number to pick for compaction. "
+ + "This value avoids merging too much sorted runs at the same time during compaction, "
+ + "which may lead to OutOfMemoryError.");
+
public static final ConfigOption<Boolean> CHANGELOG_FILE =
ConfigOptions.key("changelog-file")
.booleanType()
@@ -405,7 +414,7 @@ public class CoreOptions implements Serializable {
// By default, this ensures that the compaction does not fall to level 0, but at least to
// level 1
Integer numLevels = options.get(NUM_LEVELS);
- numLevels = numLevels == null ? numSortedRunCompactionTrigger() + 1 : numLevels;
+ numLevels = numLevels == null ? numSortedRunStopTrigger() + 1 : numLevels;
return numLevels;
}
@@ -429,6 +438,10 @@ public class CoreOptions implements Serializable {
return options.get(COMPACTION_MAX_FILE_NUM);
}
+ public int maxSortedRunNum() {
+ return options.get(COMPACTION_MAX_SORTED_RUN_NUM);
+ }
+
public boolean enableChangelogFile() {
return options.get(CHANGELOG_FILE);
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java
index f4217431..ce5eac10 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java
@@ -43,11 +43,14 @@ public class UniversalCompaction implements CompactStrategy {
private final int maxSizeAmp;
private final int sizeRatio;
private final int numRunCompactionTrigger;
+ private final int maxSortedRunNum;
- public UniversalCompaction(int maxSizeAmp, int sizeRatio, int numRunCompactionTrigger) {
+ public UniversalCompaction(
+ int maxSizeAmp, int sizeRatio, int numRunCompactionTrigger, int maxSortedRunNum) {
this.maxSizeAmp = maxSizeAmp;
this.sizeRatio = sizeRatio;
this.numRunCompactionTrigger = numRunCompactionTrigger;
+ this.maxSortedRunNum = maxSortedRunNum;
}
@Override
@@ -130,7 +133,7 @@ public class UniversalCompaction implements CompactStrategy {
}
if (candidateCount > 1) {
- return createUnit(runs, maxLevel, candidateCount);
+ return createUnit(runs, maxLevel, candidateCount, maxSortedRunNum);
}
return null;
@@ -145,8 +148,12 @@ public class UniversalCompaction implements CompactStrategy {
}
@VisibleForTesting
- static CompactUnit createUnit(List<LevelSortedRun> runs, int maxLevel, int runCount) {
+ static CompactUnit createUnit(
+ List<LevelSortedRun> runs, int maxLevel, int runCount, int maxSortedRunNum) {
int outputLevel;
+ if (runCount > maxSortedRunNum) {
+ runCount = maxSortedRunNum;
+ }
if (runCount == runs.size()) {
outputLevel = maxLevel;
} else {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
index 574c7eb0..e44f435a 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
@@ -140,7 +140,8 @@ public class KeyValueFileStoreWrite extends AbstractFileStoreWrite<KeyValue> {
new UniversalCompaction(
options.maxSizeAmplificationPercent(),
options.sortedRunSizeRatio(),
- options.numSortedRunCompactionTrigger()),
+ options.numSortedRunCompactionTrigger(),
+ options.maxSortedRunNum()),
compactExecutor,
levels),
levels,
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
index 8a5793d4..f1067545 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
@@ -292,7 +292,8 @@ public class MergeTreeTest {
new UniversalCompaction(
options.maxSizeAmplificationPercent(),
options.sortedRunSizeRatio(),
- options.numSortedRunCompactionTrigger());
+ options.numSortedRunCompactionTrigger(),
+ options.maxSortedRunNum());
CompactRewriter rewriter =
(outputLevel, dropDelete, sections) ->
dataFileWriter.write(
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java
index a42ede9c..6a8b3793 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java
@@ -38,16 +38,21 @@ public class UniversalCompactionTest {
@Test
public void testOutputLevel() {
- assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 1).outputLevel()).isEqualTo(0);
- assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 2).outputLevel()).isEqualTo(0);
- assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 3).outputLevel()).isEqualTo(2);
- assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 4).outputLevel()).isEqualTo(3);
- assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 5).outputLevel()).isEqualTo(5);
+ assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 1, Integer.MAX_VALUE).outputLevel())
+ .isEqualTo(0);
+ assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 2, Integer.MAX_VALUE).outputLevel())
+ .isEqualTo(0);
+ assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 3, Integer.MAX_VALUE).outputLevel())
+ .isEqualTo(2);
+ assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 4, Integer.MAX_VALUE).outputLevel())
+ .isEqualTo(3);
+ assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 5, Integer.MAX_VALUE).outputLevel())
+ .isEqualTo(5);
}
@Test
public void testPick() {
- UniversalCompaction compaction = new UniversalCompaction(25, 1, 3);
+ UniversalCompaction compaction = new UniversalCompaction(25, 1, 3, Integer.MAX_VALUE);
// by size amplification
Optional<CompactUnit> pick = compaction.pick(3, level0(1, 2, 3, 3));
@@ -69,9 +74,33 @@ public class UniversalCompactionTest {
assertThat(results).isEqualTo(new long[] {1, 2, 3});
}
+ @Test
+ public void testPickWithMaxSortedRunNum() {
+ UniversalCompaction compaction = new UniversalCompaction(25, 1, 3, 2);
+
+ // by size amplification
+ Optional<CompactUnit> pick = compaction.pick(3, level0(1, 2, 3, 3));
+ assertThat(pick.isPresent()).isTrue();
+ long[] results = pick.get().files().stream().mapToLong(DataFileMeta::fileSize).toArray();
+ assertThat(results).isEqualTo(new long[] {1, 2, 3, 3});
+
+ // by size ratio
+ pick = compaction.pick(3, level0(1, 1, 1, 50));
+ assertThat(pick.isPresent()).isTrue();
+ results = pick.get().files().stream().mapToLong(DataFileMeta::fileSize).toArray();
+ assertThat(results).isEqualTo(new long[] {1, 1});
+
+ // by file num
+ pick = compaction.pick(3, level0(1, 2, 3, 50));
+ assertThat(pick.isPresent()).isTrue();
+ results = pick.get().files().stream().mapToLong(DataFileMeta::fileSize).toArray();
+ // 3 should be in the candidate, by size ratio after picking by file num
+ assertThat(results).isEqualTo(new long[] {1, 2});
+ }
+
@Test
public void testSizeAmplification() {
- UniversalCompaction compaction = new UniversalCompaction(25, 0, 1);
+ UniversalCompaction compaction = new UniversalCompaction(25, 0, 1, Integer.MAX_VALUE);
long[] sizes = new long[] {1};
sizes = appendAndPickForSizeAmp(compaction, sizes);
assertThat(sizes).isEqualTo(new long[] {2});
@@ -111,7 +140,7 @@ public class UniversalCompactionTest {
@Test
public void testSizeRatio() {
- UniversalCompaction compaction = new UniversalCompaction(25, 1, 5);
+ UniversalCompaction compaction = new UniversalCompaction(25, 1, 5, Integer.MAX_VALUE);
long[] sizes = new long[] {1, 1, 1, 1};
sizes = appendAndPickForSizeRatio(compaction, sizes);
assertThat(sizes).isEqualTo(new long[] {5});
@@ -164,9 +193,9 @@ public class UniversalCompactionTest {
@Test
public void testSizeRatioThreshold() {
long[] sizes = new long[] {8, 9, 10};
- assertThat(pickForSizeRatio(new UniversalCompaction(25, 10, 2), sizes))
+ assertThat(pickForSizeRatio(new UniversalCompaction(25, 10, 2, Integer.MAX_VALUE), sizes))
.isEqualTo(new long[] {8, 9, 10});
- assertThat(pickForSizeRatio(new UniversalCompaction(25, 20, 2), sizes))
+ assertThat(pickForSizeRatio(new UniversalCompaction(25, 20, 2, Integer.MAX_VALUE), sizes))
.isEqualTo(new long[] {27});
}