You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/11/10 12:55:29 UTC
[flink-table-store] branch master updated: [hotfix] Support streaming reading for partial update with full compaction changelog producer
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 8094c2a3 [hotfix] Support streaming reading for partial update with full compaction changelog producer
8094c2a3 is described below
commit 8094c2a3ca1a01a99f395ff652b584d62d587a9d
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Thu Nov 10 20:55:18 2022 +0800
[hotfix] Support streaming reading for partial update with full compaction changelog producer
---
.../flink/table/store/connector/source/FlinkSourceBuilder.java | 9 +++++++--
1 file changed, 7 insertions(+), 2 deletions(-)
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
index e975796c..802e0ac3 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FlinkSourceBuilder.java
@@ -43,7 +43,9 @@ import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.Optional;
+import static org.apache.flink.table.store.CoreOptions.CHANGELOG_PRODUCER;
import static org.apache.flink.table.store.CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL;
+import static org.apache.flink.table.store.CoreOptions.ChangelogProducer.FULL_COMPACTION;
import static org.apache.flink.table.store.CoreOptions.LOG_SCAN;
import static org.apache.flink.table.store.CoreOptions.MERGE_ENGINE;
import static org.apache.flink.table.store.connector.FlinkConnectorOptions.COMPACTION_MANUAL_TRIGGERED;
@@ -138,9 +140,12 @@ public class FlinkSourceBuilder {
}
};
if (table.schema().primaryKeys().size() > 0
- && mergeEngineDesc.containsKey(mergeEngine)) {
+ && mergeEngineDesc.containsKey(mergeEngine)
+ && conf.get(CHANGELOG_PRODUCER) != FULL_COMPACTION) {
throw new ValidationException(
- mergeEngineDesc.get(mergeEngine) + " continuous reading is not supported.");
+ mergeEngineDesc.get(mergeEngine)
+ + " continuous reading is not supported. "
+ + "You can use full compaction changelog producer to support streaming reading.");
}
LogStartupMode startupMode = conf.get(LOG_SCAN);