You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/08/23 03:11:54 UTC
[flink-table-store] branch release-0.2 updated: [FLINK-29070] Provide a option to force the removal of the normalize node when streaming read
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-0.2
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/release-0.2 by this push:
new fa11e874 [FLINK-29070] Provide a option to force the removal of the normalize node when streaming read
fa11e874 is described below
commit fa11e8748c763fcc897f0b688c9d2e6d7c1334a3
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Tue Aug 23 11:11:49 2022 +0800
[FLINK-29070] Provide a option to force the removal of the normalize node when streaming read
This closes #273
---
docs/layouts/shortcodes/generated/core_configuration.html | 6 ++++++
.../flink/table/store/connector/source/TableStoreSource.java | 5 +++++
.../src/main/java/org/apache/flink/table/store/CoreOptions.java | 9 +++++++++
3 files changed, 20 insertions(+)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index 36eea4d7..b093a1a4 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -110,6 +110,12 @@
<td><p>Enum</p></td>
<td>Specify the startup mode for log consumer.<br /><br />Possible values:<ul><li>"full": Perform a snapshot on the table upon first startup, and continue to read the latest changes.</li><li>"latest": Start from the latest.</li><li>"from-timestamp": Start from user-supplied timestamp.</li></ul></td>
</tr>
+ <tr>
+ <td><h5>log.scan.remove-normalize</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether to force the removal of the normalize node when streaming read. Note: This is dangerous and is likely to cause data errors if downstream is used to calculate aggregation and the input is not complete changelog.</td>
+ </tr>
<tr>
<td><h5>log.scan.timestamp-millis</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
index 8914e93c..0e123a07 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
@@ -54,6 +54,7 @@ import java.util.List;
import static org.apache.flink.table.store.CoreOptions.CHANGELOG_PRODUCER;
import static org.apache.flink.table.store.CoreOptions.LOG_CHANGELOG_MODE;
import static org.apache.flink.table.store.CoreOptions.LOG_CONSISTENCY;
+import static org.apache.flink.table.store.CoreOptions.LOG_SCAN_REMOVE_NORMALIZE;
/**
* Table source to create {@link FileStoreSource} under batch mode or change-tracking is disabled.
@@ -120,6 +121,10 @@ public class TableStoreSource
} else if (table instanceof ChangelogWithKeyFileStoreTable) {
Configuration options = Configuration.fromMap(table.schema().options());
+ if (options.get(LOG_SCAN_REMOVE_NORMALIZE)) {
+ return ChangelogMode.all();
+ }
+
if (logStoreTableFactory == null
&& options.get(CHANGELOG_PRODUCER) != ChangelogProducer.NONE) {
return ChangelogMode.all();
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
index 21769cdf..10626a70 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
@@ -303,6 +303,15 @@ public class CoreOptions implements Serializable {
.defaultValue(LogChangelogMode.AUTO)
.withDescription("Specify the log changelog mode for table.");
+ public static final ConfigOption<Boolean> LOG_SCAN_REMOVE_NORMALIZE =
+ ConfigOptions.key("log.scan.remove-normalize")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to force the removal of the normalize node when streaming read."
+ + " Note: This is dangerous and is likely to cause data errors if downstream"
+ + " is used to calculate aggregation and the input is not complete changelog.");
+
public static final ConfigOption<String> LOG_KEY_FORMAT =
ConfigOptions.key("log.key.format")
.stringType()