You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/26 09:15:44 UTC
[flink-table-store] branch master updated: [FLINK-28683] Modify changelog-file to changelog-producer
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new f85ba571 [FLINK-28683] Modify changelog-file to changelog-producer
f85ba571 is described below
commit f85ba571fb3aa87d1e619b091c17a8657b9a2d16
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Tue Jul 26 17:15:39 2022 +0800
[FLINK-28683] Modify changelog-file to changelog-producer
This closes #238
---
docs/content/docs/development/streaming-query.md | 2 +-
.../shortcodes/generated/core_configuration.html | 8 ++--
.../store/connector/ContinuousFileStoreITCase.java | 2 +-
.../org/apache/flink/table/store/CoreOptions.java | 43 ++++++++++++++++++----
.../store/file/mergetree/MergeTreeWriter.java | 9 +++--
.../file/operation/KeyValueFileStoreWrite.java | 2 +-
.../table/store/file/mergetree/MergeTreeTest.java | 3 +-
.../table/ChangelogWithKeyFileStoreTableTest.java | 9 ++---
8 files changed, 53 insertions(+), 25 deletions(-)
diff --git a/docs/content/docs/development/streaming-query.md b/docs/content/docs/development/streaming-query.md
index b3e25b1e..99b77fb6 100644
--- a/docs/content/docs/development/streaming-query.md
+++ b/docs/content/docs/development/streaming-query.md
@@ -81,7 +81,7 @@ as follows (recommended):
```sql
CREATE TABLE T (...)
WITH (
- 'changelog-file' = 'true',
+ 'changelog-producer' = 'input',
'log.changelog-mode' = 'all'
)
```
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index c985f08b..fca1e287 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -21,10 +21,10 @@
<td>Specify the table store distribution policy. Data is assigned to each bucket according to the hash value of bucket-key.<br />If you specify multiple fields, delimiter is ','.<br />If not specified, the primary key will be used; if there is no primary key, the full row will be used.</td>
</tr>
<tr>
- <td><h5>changelog-file</h5></td>
- <td style="word-wrap: break-word;">false</td>
- <td>Boolean</td>
- <td>Whether to double write to a changelog file when flushing memory table. This changelog file keeps the order of data input and the details of data changes, it can be read directly during stream reads.</td>
+ <td><h5>changelog-producer</h5></td>
+ <td style="word-wrap: break-word;">none</td>
+ <td><p>Enum</p></td>
+ <td>Whether to double write to a changelog file. This changelog file keeps the details of data changes, it can be read directly during stream reads.<br /><br />Possible values:<ul><li>"none": No changelog file.</li><li>"input": Double write to a changelog file when flushing memory table, the changelog is from input.</li></ul></td>
</tr>
<tr>
<td><h5>commit.force-compact</h5></td>
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java
index 29632ba7..635993ca 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java
@@ -50,7 +50,7 @@ public class ContinuousFileStoreITCase extends FileStoreTableITCase {
@Override
protected List<String> ddl() {
- String options = changelogFile ? " WITH('changelog-file'='true')" : "";
+ String options = changelogFile ? " WITH('changelog-producer'='input')" : "";
return Arrays.asList(
"CREATE TABLE IF NOT EXISTS T1 (a STRING, b STRING, c STRING)" + options,
"CREATE TABLE IF NOT EXISTS T2 (a STRING, b STRING, c STRING, PRIMARY KEY (a) NOT ENFORCED)"
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
index e78dbee9..d4e261eb 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
@@ -253,13 +253,13 @@ public class CoreOptions implements Serializable {
+ "This value avoids merging too much sorted runs at the same time during compaction, "
+ "which may lead to OutOfMemoryError.");
- public static final ConfigOption<Boolean> CHANGELOG_FILE =
- ConfigOptions.key("changelog-file")
- .booleanType()
- .defaultValue(false)
+ public static final ConfigOption<ChangelogProducer> CHANGELOG_PRODUCER =
+ ConfigOptions.key("changelog-producer")
+ .enumType(ChangelogProducer.class)
+ .defaultValue(ChangelogProducer.NONE)
.withDescription(
- "Whether to double write to a changelog file when flushing memory table. "
- + "This changelog file keeps the order of data input and the details of data changes, "
+ "Whether to double write to a changelog file. "
+ + "This changelog file keeps the details of data changes, "
+ "it can be read directly during stream reads.");
public static final ConfigOption<String> SEQUENCE_FIELD =
@@ -446,8 +446,8 @@ public class CoreOptions implements Serializable {
return options.get(COMPACTION_MAX_SORTED_RUN_NUM);
}
- public boolean enableChangelogFile() {
- return options.get(CHANGELOG_FILE);
+ public ChangelogProducer changelogProducer() {
+ return options.get(CHANGELOG_PRODUCER);
}
public Optional<String> sequenceField() {
@@ -570,6 +570,33 @@ public class CoreOptions implements Serializable {
}
}
+ /** Specifies the changelog producer for table. */
+ public enum ChangelogProducer implements DescribedEnum {
+ NONE("none", "No changelog file."),
+
+ INPUT(
+ "input",
+ "Double write to a changelog file when flushing memory table, the changelog is from input.");
+
+ private final String value;
+ private final String description;
+
+ ChangelogProducer(String value, String description) {
+ this.value = value;
+ this.description = description;
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+
+ @Override
+ public InlineElement getDescription() {
+ return text(description);
+ }
+ }
+
@Internal
public static List<ConfigOption<?>> getOptions() {
final Field[] fields = CoreOptions.class.getFields();
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
index 34f04a71..6c1161a3 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.store.file.mergetree;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.util.MemorySegmentPool;
+import org.apache.flink.table.store.CoreOptions.ChangelogProducer;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.compact.CompactManager;
import org.apache.flink.table.store.file.compact.CompactResult;
@@ -63,7 +64,7 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
private final int numSortedRunStopTrigger;
- private final boolean enableChangelogFile;
+ private final ChangelogProducer changelogProducer;
private final LinkedHashSet<DataFileMeta> newFiles;
@@ -86,7 +87,7 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
DataFileWriter dataFileWriter,
boolean commitForceCompact,
int numSortedRunStopTrigger,
- boolean enableChangelogFile) {
+ ChangelogProducer changelogProducer) {
this.keyType = keyType;
this.valueType = valueType;
this.compactManager = compactManager;
@@ -97,7 +98,7 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
this.dataFileWriter = dataFileWriter;
this.commitForceCompact = commitForceCompact;
this.numSortedRunStopTrigger = numSortedRunStopTrigger;
- this.enableChangelogFile = enableChangelogFile;
+ this.changelogProducer = changelogProducer;
this.newFiles = new LinkedHashSet<>();
this.compactBefore = new LinkedHashMap<>();
this.compactAfter = new LinkedHashSet<>();
@@ -146,7 +147,7 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
finishCompaction(true);
}
List<String> extraFiles = new ArrayList<>();
- if (enableChangelogFile) {
+ if (changelogProducer == ChangelogProducer.INPUT) {
extraFiles.add(
dataFileWriter
.writeLevel0Changelog(
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
index e44f435a..9175bffa 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
@@ -151,7 +151,7 @@ public class KeyValueFileStoreWrite extends AbstractFileStoreWrite<KeyValue> {
dataFileWriter,
options.commitForceCompact(),
options.numSortedRunStopTrigger(),
- options.enableChangelogFile());
+ options.changelogProducer());
}
private CompactManager createCompactManager(
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
index f1067545..446cfc45 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowDataUtil;
import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.CoreOptions.ChangelogProducer;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.data.DataFileReader;
@@ -278,7 +279,7 @@ public class MergeTreeTest {
dataFileWriter,
options.commitForceCompact(),
options.numSortedRunStopTrigger(),
- false);
+ ChangelogProducer.NONE);
writer.setMemoryPool(
new HeapMemorySegmentPool(options.writeBufferSize(), options.pageSize()));
return writer;
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
index 24bf72e6..9b1495fe 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.store.table;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.CoreOptions.ChangelogProducer;
import org.apache.flink.table.store.file.WriteMode;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.predicate.PredicateBuilder;
@@ -157,7 +158,9 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
@Test
public void testStreamingChangelog() throws Exception {
- FileStoreTable table = createFileStoreTable(true);
+ FileStoreTable table =
+ createFileStoreTable(
+ conf -> conf.set(CoreOptions.CHANGELOG_PRODUCER, ChangelogProducer.INPUT));
TableWrite write = table.newWrite();
TableCommit commit = table.newCommit("user");
write.write(GenericRowData.of(1, 10, 100L));
@@ -246,10 +249,6 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
.hasSameElementsAs(Arrays.asList("1|30|300", "1|40|400"));
}
- protected FileStoreTable createFileStoreTable(boolean changelogFile) throws Exception {
- return createFileStoreTable(conf -> conf.set(CoreOptions.CHANGELOG_FILE, changelogFile));
- }
-
@Override
protected FileStoreTable createFileStoreTable(Consumer<Configuration> configure)
throws Exception {