You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/08/23 03:11:54 UTC

[flink-table-store] branch release-0.2 updated: [FLINK-29070] Provide a option to force the removal of the normalize node when streaming read

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.2
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/release-0.2 by this push:
     new fa11e874 [FLINK-29070] Provide a option to force the removal of the normalize node when streaming read
fa11e874 is described below

commit fa11e8748c763fcc897f0b688c9d2e6d7c1334a3
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Tue Aug 23 11:11:49 2022 +0800

    [FLINK-29070] Provide a option to force the removal of the normalize node when streaming read
    
    This closes #273
---
 docs/layouts/shortcodes/generated/core_configuration.html        | 6 ++++++
 .../flink/table/store/connector/source/TableStoreSource.java     | 5 +++++
 .../src/main/java/org/apache/flink/table/store/CoreOptions.java  | 9 +++++++++
 3 files changed, 20 insertions(+)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index 36eea4d7..b093a1a4 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -110,6 +110,12 @@
             <td><p>Enum</p></td>
             <td>Specify the startup mode for log consumer.<br /><br />Possible values:<ul><li>"full": Perform a snapshot on the table upon first startup, and continue to read the latest changes.</li><li>"latest": Start from the latest.</li><li>"from-timestamp": Start from user-supplied timestamp.</li></ul></td>
         </tr>
+        <tr>
+            <td><h5>log.scan.remove-normalize</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether to force the removal of the normalize node when streaming read. Note: This is dangerous and is likely to cause data errors if downstream is used to calculate aggregation and the input is not complete changelog.</td>
+        </tr>
         <tr>
             <td><h5>log.scan.timestamp-millis</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
index 8914e93c..0e123a07 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
@@ -54,6 +54,7 @@ import java.util.List;
 import static org.apache.flink.table.store.CoreOptions.CHANGELOG_PRODUCER;
 import static org.apache.flink.table.store.CoreOptions.LOG_CHANGELOG_MODE;
 import static org.apache.flink.table.store.CoreOptions.LOG_CONSISTENCY;
+import static org.apache.flink.table.store.CoreOptions.LOG_SCAN_REMOVE_NORMALIZE;
 
 /**
  * Table source to create {@link FileStoreSource} under batch mode or change-tracking is disabled.
@@ -120,6 +121,10 @@ public class TableStoreSource
         } else if (table instanceof ChangelogWithKeyFileStoreTable) {
             Configuration options = Configuration.fromMap(table.schema().options());
 
+            if (options.get(LOG_SCAN_REMOVE_NORMALIZE)) {
+                return ChangelogMode.all();
+            }
+
             if (logStoreTableFactory == null
                     && options.get(CHANGELOG_PRODUCER) != ChangelogProducer.NONE) {
                 return ChangelogMode.all();
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
index 21769cdf..10626a70 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
@@ -303,6 +303,15 @@ public class CoreOptions implements Serializable {
                     .defaultValue(LogChangelogMode.AUTO)
                     .withDescription("Specify the log changelog mode for table.");
 
+    public static final ConfigOption<Boolean> LOG_SCAN_REMOVE_NORMALIZE =
+            ConfigOptions.key("log.scan.remove-normalize")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether to force the removal of the normalize node when streaming read."
+                                    + " Note: This is dangerous and is likely to cause data errors if downstream"
+                                    + " is used to calculate aggregation and the input is not complete changelog.");
+
     public static final ConfigOption<String> LOG_KEY_FORMAT =
             ConfigOptions.key("log.key.format")
                     .stringType()