You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/26 09:16:19 UTC

[flink-table-store] branch release-0.2 updated: [FLINK-28683] Modify changelog-file to changelog-producer

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.2
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/release-0.2 by this push:
     new bfc02fd8 [FLINK-28683] Modify changelog-file to changelog-producer
bfc02fd8 is described below

commit bfc02fd8287119a9686f29a11c4a19d3e672c963
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Tue Jul 26 17:15:39 2022 +0800

    [FLINK-28683] Modify changelog-file to changelog-producer
    
    This closes #238
---
 docs/content/docs/development/streaming-query.md   |  2 +-
 .../shortcodes/generated/core_configuration.html   |  8 ++--
 .../store/connector/ContinuousFileStoreITCase.java |  2 +-
 .../org/apache/flink/table/store/CoreOptions.java  | 43 ++++++++++++++++++----
 .../store/file/mergetree/MergeTreeWriter.java      |  9 +++--
 .../file/operation/KeyValueFileStoreWrite.java     |  2 +-
 .../table/store/file/mergetree/MergeTreeTest.java  |  3 +-
 .../table/ChangelogWithKeyFileStoreTableTest.java  |  9 ++---
 8 files changed, 53 insertions(+), 25 deletions(-)

diff --git a/docs/content/docs/development/streaming-query.md b/docs/content/docs/development/streaming-query.md
index b3e25b1e..99b77fb6 100644
--- a/docs/content/docs/development/streaming-query.md
+++ b/docs/content/docs/development/streaming-query.md
@@ -81,7 +81,7 @@ as follows (recommended):
 ```sql
 CREATE TABLE T (...)
 WITH (
-    'changelog-file' = 'true',
+    'changelog-producer' = 'input',
     'log.changelog-mode' = 'all'
 )
 ```
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index c985f08b..fca1e287 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -21,10 +21,10 @@
             <td>Specify the table store distribution policy. Data is assigned to each bucket according to the hash value of bucket-key.<br />If you specify multiple fields, delimiter is ','.<br />If not specified, the primary key will be used; if there is no primary key, the full row will be used.</td>
         </tr>
         <tr>
-            <td><h5>changelog-file</h5></td>
-            <td style="word-wrap: break-word;">false</td>
-            <td>Boolean</td>
-            <td>Whether to double write to a changelog file when flushing memory table. This changelog file keeps the order of data input and the details of data changes, it can be read directly during stream reads.</td>
+            <td><h5>changelog-producer</h5></td>
+            <td style="word-wrap: break-word;">none</td>
+            <td><p>Enum</p></td>
+            <td>Whether to double write to a changelog file. This changelog file keeps the details of data changes, it can be read directly during stream reads.<br /><br />Possible values:<ul><li>"none": No changelog file.</li><li>"input": Double write to a changelog file when flushing memory table, the changelog is from input.</li></ul></td>
         </tr>
         <tr>
             <td><h5>commit.force-compact</h5></td>
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java
index 29632ba7..635993ca 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java
@@ -50,7 +50,7 @@ public class ContinuousFileStoreITCase extends FileStoreTableITCase {
 
     @Override
     protected List<String> ddl() {
-        String options = changelogFile ? " WITH('changelog-file'='true')" : "";
+        String options = changelogFile ? " WITH('changelog-producer'='input')" : "";
         return Arrays.asList(
                 "CREATE TABLE IF NOT EXISTS T1 (a STRING, b STRING, c STRING)" + options,
                 "CREATE TABLE IF NOT EXISTS T2 (a STRING, b STRING, c STRING, PRIMARY KEY (a) NOT ENFORCED)"
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
index e78dbee9..d4e261eb 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
@@ -253,13 +253,13 @@ public class CoreOptions implements Serializable {
                                     + "This value avoids merging too much sorted runs at the same time during compaction, "
                                     + "which may lead to OutOfMemoryError.");
 
-    public static final ConfigOption<Boolean> CHANGELOG_FILE =
-            ConfigOptions.key("changelog-file")
-                    .booleanType()
-                    .defaultValue(false)
+    public static final ConfigOption<ChangelogProducer> CHANGELOG_PRODUCER =
+            ConfigOptions.key("changelog-producer")
+                    .enumType(ChangelogProducer.class)
+                    .defaultValue(ChangelogProducer.NONE)
                     .withDescription(
-                            "Whether to double write to a changelog file when flushing memory table. "
-                                    + "This changelog file keeps the order of data input and the details of data changes, "
+                            "Whether to double write to a changelog file. "
+                                    + "This changelog file keeps the details of data changes, "
                                     + "it can be read directly during stream reads.");
 
     public static final ConfigOption<String> SEQUENCE_FIELD =
@@ -446,8 +446,8 @@ public class CoreOptions implements Serializable {
         return options.get(COMPACTION_MAX_SORTED_RUN_NUM);
     }
 
-    public boolean enableChangelogFile() {
-        return options.get(CHANGELOG_FILE);
+    public ChangelogProducer changelogProducer() {
+        return options.get(CHANGELOG_PRODUCER);
     }
 
     public Optional<String> sequenceField() {
@@ -570,6 +570,33 @@ public class CoreOptions implements Serializable {
         }
     }
 
+    /** Specifies the changelog producer for table. */
+    public enum ChangelogProducer implements DescribedEnum {
+        NONE("none", "No changelog file."),
+
+        INPUT(
+                "input",
+                "Double write to a changelog file when flushing memory table, the changelog is from input.");
+
+        private final String value;
+        private final String description;
+
+        ChangelogProducer(String value, String description) {
+            this.value = value;
+            this.description = description;
+        }
+
+        @Override
+        public String toString() {
+            return value;
+        }
+
+        @Override
+        public InlineElement getDescription() {
+            return text(description);
+        }
+    }
+
     @Internal
     public static List<ConfigOption<?>> getOptions() {
         final Field[] fields = CoreOptions.class.getFields();
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
index 34f04a71..6c1161a3 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.store.file.mergetree;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.util.MemorySegmentPool;
+import org.apache.flink.table.store.CoreOptions.ChangelogProducer;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.compact.CompactManager;
 import org.apache.flink.table.store.file.compact.CompactResult;
@@ -63,7 +64,7 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
 
     private final int numSortedRunStopTrigger;
 
-    private final boolean enableChangelogFile;
+    private final ChangelogProducer changelogProducer;
 
     private final LinkedHashSet<DataFileMeta> newFiles;
 
@@ -86,7 +87,7 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
             DataFileWriter dataFileWriter,
             boolean commitForceCompact,
             int numSortedRunStopTrigger,
-            boolean enableChangelogFile) {
+            ChangelogProducer changelogProducer) {
         this.keyType = keyType;
         this.valueType = valueType;
         this.compactManager = compactManager;
@@ -97,7 +98,7 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
         this.dataFileWriter = dataFileWriter;
         this.commitForceCompact = commitForceCompact;
         this.numSortedRunStopTrigger = numSortedRunStopTrigger;
-        this.enableChangelogFile = enableChangelogFile;
+        this.changelogProducer = changelogProducer;
         this.newFiles = new LinkedHashSet<>();
         this.compactBefore = new LinkedHashMap<>();
         this.compactAfter = new LinkedHashSet<>();
@@ -146,7 +147,7 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
                 finishCompaction(true);
             }
             List<String> extraFiles = new ArrayList<>();
-            if (enableChangelogFile) {
+            if (changelogProducer == ChangelogProducer.INPUT) {
                 extraFiles.add(
                         dataFileWriter
                                 .writeLevel0Changelog(
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
index e44f435a..9175bffa 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
@@ -151,7 +151,7 @@ public class KeyValueFileStoreWrite extends AbstractFileStoreWrite<KeyValue> {
                 dataFileWriter,
                 options.commitForceCompact(),
                 options.numSortedRunStopTrigger(),
-                options.enableChangelogFile());
+                options.changelogProducer());
     }
 
     private CompactManager createCompactManager(
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
index f1067545..446cfc45 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowDataUtil;
 import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.CoreOptions.ChangelogProducer;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.data.DataFileReader;
@@ -278,7 +279,7 @@ public class MergeTreeTest {
                         dataFileWriter,
                         options.commitForceCompact(),
                         options.numSortedRunStopTrigger(),
-                        false);
+                        ChangelogProducer.NONE);
         writer.setMemoryPool(
                 new HeapMemorySegmentPool(options.writeBufferSize(), options.pageSize()));
         return writer;
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
index 24bf72e6..9b1495fe 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.store.table;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.CoreOptions.ChangelogProducer;
 import org.apache.flink.table.store.file.WriteMode;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.predicate.PredicateBuilder;
@@ -157,7 +158,9 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
 
     @Test
     public void testStreamingChangelog() throws Exception {
-        FileStoreTable table = createFileStoreTable(true);
+        FileStoreTable table =
+                createFileStoreTable(
+                        conf -> conf.set(CoreOptions.CHANGELOG_PRODUCER, ChangelogProducer.INPUT));
         TableWrite write = table.newWrite();
         TableCommit commit = table.newCommit("user");
         write.write(GenericRowData.of(1, 10, 100L));
@@ -246,10 +249,6 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
                 .hasSameElementsAs(Arrays.asList("1|30|300", "1|40|400"));
     }
 
-    protected FileStoreTable createFileStoreTable(boolean changelogFile) throws Exception {
-        return createFileStoreTable(conf -> conf.set(CoreOptions.CHANGELOG_FILE, changelogFile));
-    }
-
     @Override
     protected FileStoreTable createFileStoreTable(Consumer<Configuration> configure)
             throws Exception {