You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/11/10 12:55:29 UTC

[flink-table-store] branch master updated: [hotfix] Support streaming reading for partial update with full compaction changelog producer

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 8094c2a3 [hotfix] Support streaming reading for partial update with full compaction changelog producer
8094c2a3 is described below

commit 8094c2a3ca1a01a99f395ff652b584d62d587a9d
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Thu Nov 10 20:55:18 2022 +0800

    [hotfix] Support streaming reading for partial update with full compaction changelog producer
---
 .../flink/table/store/connector/source/FlinkSourceBuilder.java   | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
index e975796c..802e0ac3 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
@@ -43,7 +43,9 @@ import javax.annotation.Nullable;
 import java.util.HashMap;
 import java.util.Optional;
 
+import static org.apache.flink.table.store.CoreOptions.CHANGELOG_PRODUCER;
 import static org.apache.flink.table.store.CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL;
+import static org.apache.flink.table.store.CoreOptions.ChangelogProducer.FULL_COMPACTION;
 import static org.apache.flink.table.store.CoreOptions.LOG_SCAN;
 import static org.apache.flink.table.store.CoreOptions.MERGE_ENGINE;
 import static org.apache.flink.table.store.connector.FlinkConnectorOptions.COMPACTION_MANUAL_TRIGGERED;
@@ -138,9 +140,12 @@ public class FlinkSourceBuilder {
                         }
                     };
             if (table.schema().primaryKeys().size() > 0
-                    && mergeEngineDesc.containsKey(mergeEngine)) {
+                    && mergeEngineDesc.containsKey(mergeEngine)
+                    && conf.get(CHANGELOG_PRODUCER) != FULL_COMPACTION) {
                 throw new ValidationException(
-                        mergeEngineDesc.get(mergeEngine) + " continuous reading is not supported.");
+                        mergeEngineDesc.get(mergeEngine)
+                                + " continuous reading is not supported. "
+                                + "You can use full compaction changelog producer to support streaming reading.");
             }
 
             LogStartupMode startupMode = conf.get(LOG_SCAN);