You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/03/21 09:47:44 UTC
[flink-table-store] branch master updated: [hotfix] Optimize getChangelogMode in TableStoreSink and TableStoreSource
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 47f7d03 [hotfix] Optimize getChangelogMode in TableStoreSink and TableStoreSource
47f7d03 is described below
commit 47f7d03e46c34232aeab61d192e1df68c04ac92f
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Mon Mar 21 17:47:41 2022 +0800
[hotfix] Optimize getChangelogMode in TableStoreSink and TableStoreSource
This closes #55
---
.../flink/table/store/connector/TableStore.java | 7 ++++--
.../table/store/connector/TableStoreFactory.java | 12 ++-------
.../table/store/connector/sink/TableStoreSink.java | 26 +++++++++++++------
.../store/connector/source/TableStoreSource.java | 29 ++++++++++++++++------
4 files changed, 48 insertions(+), 26 deletions(-)
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
index acfd094..a0a5859 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
@@ -130,6 +130,10 @@ public class TableStore {
return partitionType.getFieldNames();
}
+ public Configuration logOptions() {
+ return new DelegatingConfiguration(options, LOG_PREFIX);
+ }
+
public SourceBuilder sourceBuilder() {
return new SourceBuilder();
}
@@ -221,8 +225,7 @@ public class TableStore {
boolean latestContinuous = false;
if (isContinuous) {
- LogStartupMode startupMode =
- new DelegatingConfiguration(options, LOG_PREFIX).get(SCAN);
+ LogStartupMode startupMode = logOptions().get(SCAN);
latestContinuous = startupMode == LogStartupMode.LATEST;
}
return new FileStoreSource(
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
index d18cb40..cbe6f8a 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
@@ -31,8 +31,6 @@ import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
-import org.apache.flink.table.connector.sink.DynamicTableSink;
-import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
@@ -155,7 +153,7 @@ public class TableStoreFactory
}
@Override
- public DynamicTableSource createDynamicTableSource(Context context) {
+ public TableStoreSource createDynamicTableSource(Context context) {
return new TableStoreSource(
buildTableStore(context),
context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
@@ -165,15 +163,9 @@ public class TableStoreFactory
}
@Override
- public DynamicTableSink createDynamicTableSink(Context context) {
+ public TableStoreSink createDynamicTableSink(Context context) {
return new TableStoreSink(
buildTableStore(context),
- LogChangelogMode.valueOf(
- context.getCatalogTable()
- .getOptions()
- .getOrDefault(
- LOG_PREFIX + CHANGELOG_MODE.key(),
- CHANGELOG_MODE.defaultValue().toString().toUpperCase())),
createLogContext(context),
createOptionalLogStoreFactory(context).orElse(null));
}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
index 43fc727..eba324d 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.store.connector.sink;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogLock;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.RequireCatalogLock;
@@ -41,12 +42,13 @@ import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.Map;
+import static org.apache.flink.table.store.log.LogOptions.CHANGELOG_MODE;
+
/** Table sink to create {@link StoreSink}. */
public class TableStoreSink
implements DynamicTableSink, SupportsOverwrite, SupportsPartitioning, RequireCatalogLock {
private final TableStore tableStore;
- private final LogOptions.LogChangelogMode logChangelogMode;
private final DynamicTableFactory.Context logStoreContext;
@Nullable private final LogStoreTableFactory logStoreTableFactory;
@@ -56,19 +58,22 @@ public class TableStoreSink
public TableStoreSink(
TableStore tableStore,
- LogOptions.LogChangelogMode logChangelogMode,
DynamicTableFactory.Context logStoreContext,
@Nullable LogStoreTableFactory logStoreTableFactory) {
this.tableStore = tableStore;
- this.logChangelogMode = logChangelogMode;
this.logStoreContext = logStoreContext;
this.logStoreTableFactory = logStoreTableFactory;
}
@Override
public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
- if (!tableStore.valueCountMode()
- && logChangelogMode == LogOptions.LogChangelogMode.UPSERT) {
+ if (tableStore.valueCountMode()) {
+ // no primary key, sink all changelogs
+ return requestedMode;
+ }
+
+ if (tableStore.logOptions().get(CHANGELOG_MODE) != LogOptions.LogChangelogMode.ALL) {
+ // with primary key, default sink upsert
ChangelogMode.Builder builder = ChangelogMode.newBuilder();
for (RowKind kind : requestedMode.getContainedKinds()) {
if (kind != RowKind.UPDATE_BEFORE) {
@@ -77,6 +82,14 @@ public class TableStoreSink
}
return builder.build();
}
+
+ // all changelog mode configured
+ if (!requestedMode.contains(RowKind.UPDATE_BEFORE)
+ || !requestedMode.contains(RowKind.UPDATE_AFTER)) {
+ throw new ValidationException(
+ "You cannot insert incomplete data into a table that "
+ + "has primary key and declares all changelog mode.");
+ }
return requestedMode;
}
@@ -131,8 +144,7 @@ public class TableStoreSink
@Override
public DynamicTableSink copy() {
TableStoreSink copied =
- new TableStoreSink(
- tableStore, logChangelogMode, logStoreContext, logStoreTableFactory);
+ new TableStoreSink(tableStore, logStoreContext, logStoreTableFactory);
copied.staticPartitions = new HashMap<>(staticPartitions);
copied.overwrite = overwrite;
copied.lockFactory = lockFactory;
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
index 6dfbc99..171ccb5 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.store.connector.source;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
@@ -47,6 +48,11 @@ import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
+import static org.apache.flink.table.store.log.LogOptions.CHANGELOG_MODE;
+import static org.apache.flink.table.store.log.LogOptions.CONSISTENCY;
+import static org.apache.flink.table.store.log.LogOptions.LogChangelogMode.ALL;
+import static org.apache.flink.table.store.log.LogOptions.LogConsistency.TRANSACTIONAL;
+
/**
* Table source to create {@link FileStoreSource} under batch mode or change-tracking is disabled.
* For streaming mode with change-tracking enabled and FULL scan mode, it will create a {@link
@@ -78,13 +84,22 @@ public class TableStoreSource
@Override
public ChangelogMode getChangelogMode() {
- return streaming
- ? tableStore.valueCountMode()
- ? ChangelogMode.all()
- // TODO: optimize upsert when consistency mode is transactional and
- // log.changelog-mode is all
- : ChangelogMode.upsert()
- : ChangelogMode.insertOnly();
+ if (!streaming) {
+ // batch merge all, return insert only
+ return ChangelogMode.insertOnly();
+ }
+
+ if (tableStore.valueCountMode()) {
+ // no primary key, return all
+ return ChangelogMode.all();
+ }
+
+ // optimization: transaction consistency and all changelog mode avoid the generation of
+ // normalized nodes. See TableStoreSink.getChangelogMode validation.
+ Configuration logOptions = tableStore.logOptions();
+ return logOptions.get(CONSISTENCY) == TRANSACTIONAL && logOptions.get(CHANGELOG_MODE) == ALL
+ ? ChangelogMode.all()
+ : ChangelogMode.upsert();
}
@Override