You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/03/21 09:47:44 UTC

[flink-table-store] branch master updated: [hotfix] Optimize getChangelogMode in TableStoreSink and TableStoreSource

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 47f7d03  [hotfix] Optimize getChangelogMode in TableStoreSink and TableStoreSource
47f7d03 is described below

commit 47f7d03e46c34232aeab61d192e1df68c04ac92f
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Mon Mar 21 17:47:41 2022 +0800

    [hotfix] Optimize getChangelogMode in TableStoreSink and TableStoreSource
    
    This closes #55
---
 .../flink/table/store/connector/TableStore.java    |  7 ++++--
 .../table/store/connector/TableStoreFactory.java   | 12 ++-------
 .../table/store/connector/sink/TableStoreSink.java | 26 +++++++++++++------
 .../store/connector/source/TableStoreSource.java   | 29 ++++++++++++++++------
 4 files changed, 48 insertions(+), 26 deletions(-)

diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
index acfd094..a0a5859 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
@@ -130,6 +130,10 @@ public class TableStore {
         return partitionType.getFieldNames();
     }
 
+    public Configuration logOptions() {
+        return new DelegatingConfiguration(options, LOG_PREFIX);
+    }
+
     public SourceBuilder sourceBuilder() {
         return new SourceBuilder();
     }
@@ -221,8 +225,7 @@ public class TableStore {
 
             boolean latestContinuous = false;
             if (isContinuous) {
-                LogStartupMode startupMode =
-                        new DelegatingConfiguration(options, LOG_PREFIX).get(SCAN);
+                LogStartupMode startupMode = logOptions().get(SCAN);
                 latestContinuous = startupMode == LogStartupMode.LATEST;
             }
             return new FileStoreSource(
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
index d18cb40..cbe6f8a 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
@@ -31,8 +31,6 @@ import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
-import org.apache.flink.table.connector.sink.DynamicTableSink;
-import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.factories.DynamicTableSinkFactory;
 import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.factories.FactoryUtil;
@@ -155,7 +153,7 @@ public class TableStoreFactory
     }
 
     @Override
-    public DynamicTableSource createDynamicTableSource(Context context) {
+    public TableStoreSource createDynamicTableSource(Context context) {
         return new TableStoreSource(
                 buildTableStore(context),
                 context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
@@ -165,15 +163,9 @@ public class TableStoreFactory
     }
 
     @Override
-    public DynamicTableSink createDynamicTableSink(Context context) {
+    public TableStoreSink createDynamicTableSink(Context context) {
         return new TableStoreSink(
                 buildTableStore(context),
-                LogChangelogMode.valueOf(
-                        context.getCatalogTable()
-                                .getOptions()
-                                .getOrDefault(
-                                        LOG_PREFIX + CHANGELOG_MODE.key(),
-                                        CHANGELOG_MODE.defaultValue().toString().toUpperCase())),
                 createLogContext(context),
                 createOptionalLogStoreFactory(context).orElse(null));
     }
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
index 43fc727..eba324d 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.store.connector.sink;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.CatalogLock;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.RequireCatalogLock;
@@ -41,12 +42,13 @@ import javax.annotation.Nullable;
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.apache.flink.table.store.log.LogOptions.CHANGELOG_MODE;
+
 /** Table sink to create {@link StoreSink}. */
 public class TableStoreSink
         implements DynamicTableSink, SupportsOverwrite, SupportsPartitioning, RequireCatalogLock {
 
     private final TableStore tableStore;
-    private final LogOptions.LogChangelogMode logChangelogMode;
     private final DynamicTableFactory.Context logStoreContext;
     @Nullable private final LogStoreTableFactory logStoreTableFactory;
 
@@ -56,19 +58,22 @@ public class TableStoreSink
 
     public TableStoreSink(
             TableStore tableStore,
-            LogOptions.LogChangelogMode logChangelogMode,
             DynamicTableFactory.Context logStoreContext,
             @Nullable LogStoreTableFactory logStoreTableFactory) {
         this.tableStore = tableStore;
-        this.logChangelogMode = logChangelogMode;
         this.logStoreContext = logStoreContext;
         this.logStoreTableFactory = logStoreTableFactory;
     }
 
     @Override
     public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
-        if (!tableStore.valueCountMode()
-                && logChangelogMode == LogOptions.LogChangelogMode.UPSERT) {
+        if (tableStore.valueCountMode()) {
+            // no primary key, sink all changelogs
+            return requestedMode;
+        }
+
+        if (tableStore.logOptions().get(CHANGELOG_MODE) != LogOptions.LogChangelogMode.ALL) {
+            // with primary key, default sink upsert
             ChangelogMode.Builder builder = ChangelogMode.newBuilder();
             for (RowKind kind : requestedMode.getContainedKinds()) {
                 if (kind != RowKind.UPDATE_BEFORE) {
@@ -77,6 +82,14 @@ public class TableStoreSink
             }
             return builder.build();
         }
+
+        // all changelog mode configured
+        if (!requestedMode.contains(RowKind.UPDATE_BEFORE)
+                || !requestedMode.contains(RowKind.UPDATE_AFTER)) {
+            throw new ValidationException(
+                    "You cannot insert incomplete data into a table that "
+                            + "has primary key and declares all changelog mode.");
+        }
         return requestedMode;
     }
 
@@ -131,8 +144,7 @@ public class TableStoreSink
     @Override
     public DynamicTableSink copy() {
         TableStoreSink copied =
-                new TableStoreSink(
-                        tableStore, logChangelogMode, logStoreContext, logStoreTableFactory);
+                new TableStoreSink(tableStore, logStoreContext, logStoreTableFactory);
         copied.staticPartitions = new HashMap<>(staticPartitions);
         copied.overwrite = overwrite;
         copied.lockFactory = lockFactory;
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
index 6dfbc99..171ccb5 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.store.connector.source;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.connector.source.ScanTableSource;
@@ -47,6 +48,11 @@ import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.table.store.log.LogOptions.CHANGELOG_MODE;
+import static org.apache.flink.table.store.log.LogOptions.CONSISTENCY;
+import static org.apache.flink.table.store.log.LogOptions.LogChangelogMode.ALL;
+import static org.apache.flink.table.store.log.LogOptions.LogConsistency.TRANSACTIONAL;
+
 /**
  * Table source to create {@link FileStoreSource} under batch mode or change-tracking is disabled.
  * For streaming mode with change-tracking enabled and FULL scan mode, it will create a {@link
@@ -78,13 +84,22 @@ public class TableStoreSource
 
     @Override
     public ChangelogMode getChangelogMode() {
-        return streaming
-                ? tableStore.valueCountMode()
-                        ? ChangelogMode.all()
-                        // TODO: optimize upsert when consistency mode is transactional and
-                        // log.changelog-mode is all
-                        : ChangelogMode.upsert()
-                : ChangelogMode.insertOnly();
+        if (!streaming) {
+            // batch merge all, return insert only
+            return ChangelogMode.insertOnly();
+        }
+
+        if (tableStore.valueCountMode()) {
+            // no primary key, return all
+            return ChangelogMode.all();
+        }
+
+        // optimization: transaction consistency and all changelog mode avoid the generation of
+        // normalized nodes. See TableStoreSink.getChangelogMode validation.
+        Configuration logOptions = tableStore.logOptions();
+        return logOptions.get(CONSISTENCY) == TRANSACTIONAL && logOptions.get(CHANGELOG_MODE) == ALL
+                ? ChangelogMode.all()
+                : ChangelogMode.upsert();
     }
 
     @Override