You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/03/25 06:53:36 UTC

[flink-table-store] branch master updated: [FLINK-26669] Refactor ReadWriteTableITCase

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 36f045c  [FLINK-26669] Refactor ReadWriteTableITCase
36f045c is described below

commit 36f045c55c2095612ace2637c6f4a03da2837467
Author: Jane Chan <55...@users.noreply.github.com>
AuthorDate: Fri Mar 25 14:53:31 2022 +0800

    [FLINK-26669] Refactor ReadWriteTableITCase
    
    This closes #58
---
 .../flink/table/store/connector/TableStore.java    |  21 +-
 .../connector/sink/BucketStreamPartitioner.java    |  11 +-
 .../table/store/connector/sink/StoreSink.java      |   7 +-
 .../store/connector/sink/StoreSinkWriter.java      |   3 +-
 .../store/connector/ReadWriteTableITCase.java      | 786 ++++++++++++++++-----
 .../table/store/connector/TableStoreTestBase.java  |   8 +
 .../table/store/connector/sink/StoreSinkTest.java  |   2 +
 .../table/store/sink/SinkRecordConverter.java      |  28 +-
 .../table/store/kafka/KafkaLogSinkProvider.java    |  10 +-
 .../table/store/kafka/KafkaLogStoreFactory.java    |   6 +-
 10 files changed, 669 insertions(+), 213 deletions(-)

diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
index a33c06f..9374933 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
@@ -83,9 +83,12 @@ public class TableStore {
     /** partition keys, default no partition. */
     private int[] partitions = new int[0];
 
-    /** primary keys, default no key. */
+    /** file store primary keys which exclude partition fields if partitioned, default no key. */
     private int[] primaryKeys = new int[0];
 
+    /** log store primary keys which include partition fields if partitioned, default no key. */
+    private int[] logPrimaryKeys = new int[0];
+
     private RowType type;
 
     private ObjectIdentifier tableIdentifier;
@@ -112,6 +115,7 @@ public class TableStore {
 
     public TableStore withPrimaryKeys(int[] primaryKeys) {
         this.primaryKeys = primaryKeys;
+        this.logPrimaryKeys = primaryKeys;
         adjustIndexAndValidate();
         return this;
     }
@@ -186,15 +190,16 @@ public class TableStore {
     }
 
     private void adjustIndexAndValidate() {
-        if (primaryKeys.length > 0 && partitions.length > 0) {
-            List<Integer> pkList = Arrays.stream(primaryKeys).boxed().collect(Collectors.toList());
+        if (logPrimaryKeys.length > 0 && partitions.length > 0) {
+            List<Integer> pkList =
+                    Arrays.stream(logPrimaryKeys).boxed().collect(Collectors.toList());
             List<Integer> partitionList =
                     Arrays.stream(partitions).boxed().collect(Collectors.toList());
 
             String pkInfo =
                     type == null
                             ? pkList.toString()
-                            : TypeUtils.project(type, primaryKeys).getFieldNames().toString();
+                            : TypeUtils.project(type, logPrimaryKeys).getFieldNames().toString();
             String partitionInfo =
                     type == null
                             ? partitionList.toString()
@@ -205,7 +210,9 @@ public class TableStore {
                             "Primary key constraint %s should include all partition fields %s",
                             pkInfo, partitionInfo));
             primaryKeys =
-                    Arrays.stream(primaryKeys).filter(pk -> !partitionList.contains(pk)).toArray();
+                    Arrays.stream(logPrimaryKeys)
+                            .filter(pk -> !partitionList.contains(pk))
+                            .toArray();
 
             Preconditions.checkState(
                     primaryKeys.length > 0,
@@ -354,7 +361,8 @@ public class TableStore {
             int numBucket = options.get(BUCKET);
 
             BucketStreamPartitioner partitioner =
-                    new BucketStreamPartitioner(numBucket, type, partitions, primaryKeys);
+                    new BucketStreamPartitioner(
+                            numBucket, type, partitions, primaryKeys, logPrimaryKeys);
             DataStream<RowData> partitioned =
                     new DataStream<>(
                             input.getExecutionEnvironment(),
@@ -366,6 +374,7 @@ public class TableStore {
                             fileStore,
                             partitions,
                             primaryKeys,
+                            logPrimaryKeys,
                             numBucket,
                             lockFactory,
                             overwritePartition,
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java
index f69ecc0..dedf5d8 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java
@@ -33,22 +33,29 @@ public class BucketStreamPartitioner extends StreamPartitioner<RowData> {
     private final RowType inputType;
     private final int[] partitions;
     private final int[] primaryKeys;
+    private final int[] logPrimaryKeys;
 
     private transient SinkRecordConverter recordConverter;
 
     public BucketStreamPartitioner(
-            int numBucket, RowType inputType, int[] partitions, int[] primaryKeys) {
+            int numBucket,
+            RowType inputType,
+            int[] partitions,
+            int[] primaryKeys,
+            final int[] logPrimaryKeys) {
         this.numBucket = numBucket;
         this.inputType = inputType;
         this.partitions = partitions;
         this.primaryKeys = primaryKeys;
+        this.logPrimaryKeys = logPrimaryKeys;
     }
 
     @Override
     public void setup(int numberOfChannels) {
         super.setup(numberOfChannels);
         this.recordConverter =
-                new SinkRecordConverter(numBucket, inputType, partitions, primaryKeys);
+                new SinkRecordConverter(
+                        numBucket, inputType, partitions, primaryKeys, logPrimaryKeys);
     }
 
     @Override
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
index a848ca6..88cf8b5 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
@@ -63,6 +63,8 @@ public class StoreSink<WriterStateT, LogCommT>
 
     private final int[] primaryKeys;
 
+    private final int[] logPrimaryKeys;
+
     private final int numBucket;
 
     @Nullable private final CatalogLock.Factory lockFactory;
@@ -76,6 +78,7 @@ public class StoreSink<WriterStateT, LogCommT>
             FileStore fileStore,
             int[] partitions,
             int[] primaryKeys,
+            int[] logPrimaryKeys,
             int numBucket,
             @Nullable CatalogLock.Factory lockFactory,
             @Nullable Map<String, String> overwritePartition,
@@ -84,6 +87,7 @@ public class StoreSink<WriterStateT, LogCommT>
         this.fileStore = fileStore;
         this.partitions = partitions;
         this.primaryKeys = primaryKeys;
+        this.logPrimaryKeys = logPrimaryKeys;
         this.numBucket = numBucket;
         this.lockFactory = lockFactory;
         this.overwritePartition = overwritePartition;
@@ -117,7 +121,8 @@ public class StoreSink<WriterStateT, LogCommT>
                         numBucket,
                         primaryKeys.length > 0 ? fileStore.valueType() : fileStore.keyType(),
                         partitions,
-                        primaryKeys),
+                        primaryKeys,
+                        logPrimaryKeys),
                 overwritePartition != null,
                 logWriter,
                 logCallback);
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
index 4effdd8..6a568a6 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
@@ -105,8 +105,9 @@ public class StoreSinkWriter<WriterStateT>
             throw new IOException(e);
         }
 
-        // write to log store
+        // write to log store, need to preserve original pk (which includes partition fields)
         if (logWriter != null) {
+            record = recordConverter.convertToLogSinkRecord(record);
             logWriter.write(record, context);
         }
     }
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
index 819e018..7b6d648 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
@@ -19,229 +19,577 @@
 package org.apache.flink.table.store.connector;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableResult;
-import org.apache.flink.table.planner.runtime.utils.TestData;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.utils.BlockingIterator;
+import org.apache.flink.table.store.kafka.KafkaTableTestBase;
+import org.apache.flink.table.store.log.LogOptions;
 import org.apache.flink.types.Row;
-import org.apache.flink.util.CloseableIterator;
 
+import org.junit.Ignore;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 
-import javax.annotation.Nullable;
-
-import java.math.BigDecimal;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import scala.collection.JavaConverters;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
 
 import static org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
 import static org.apache.flink.table.planner.factories.TestValuesTableFactory.registerData;
+import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.LOG_SYSTEM;
+import static org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
+import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-/** IT cases for testing querying managed table dml. */
-@RunWith(Parameterized.class)
-public class ReadWriteTableITCase extends TableStoreTestBase {
+/** IT cases for managed table dml. */
+public class ReadWriteTableITCase extends KafkaTableTestBase {
 
-    private final boolean hasPk;
-    @Nullable private final Boolean duplicate;
+    private String rootPath;
 
-    public ReadWriteTableITCase(
-            RuntimeExecutionMode executionMode,
-            String tableName,
-            boolean enableLogStore,
-            boolean hasPk,
-            @Nullable Boolean duplicate,
-            ExpectedResult expectedResult) {
-        super(executionMode, tableName, enableLogStore, expectedResult);
-        this.hasPk = hasPk;
-        this.duplicate = duplicate;
+    @Test
+    public void testBatchWriteWithPartitionedRecordsWithPk() throws Exception {
+        String managedTable = prepareEnvAndWrite(false, false, true, true);
+
+        // input is dailyRates()
+        List<Row> expectedRecords =
+                Arrays.asList(
+                        // part = 2022-01-01
+                        changelogRow("+I", "US Dollar", 102L, "2022-01-01"),
+                        changelogRow("+I", "Yen", 1L, "2022-01-01"),
+                        changelogRow("+I", "Euro", 114L, "2022-01-01"),
+                        // part = 2022-01-02
+                        changelogRow("+I", "Euro", 119L, "2022-01-02"));
+        // test batch read
+        collectAndCheck(tEnv, managedTable, Collections.emptyMap(), expectedRecords).close();
+        checkFileStorePath(tEnv, managedTable);
+
+        // test streaming read
+        final StreamTableEnvironment streamTableEnv =
+                StreamTableEnvironment.create(buildStreamEnv());
+        registerTable(streamTableEnv, managedTable);
+        BlockingIterator<Row, Row> streamIter =
+                collectAndCheck(
+                        streamTableEnv, managedTable, Collections.emptyMap(), expectedRecords);
+
+        // overwrite static partition 2022-01-02
+        prepareEnvAndOverwrite(
+                managedTable,
+                Collections.singletonMap("dt", "'2022-01-02'"),
+                Arrays.asList(new String[] {"'Euro'", "100"}, new String[] {"'Yen'", "1"}));
+
+        // streaming iter will not receive any changelog
+        assertNoMoreRecords(streamIter);
+
+        // batch read to check partition refresh
+        expectedRecords = new ArrayList<>(expectedRecords);
+        expectedRecords.remove(3);
+        expectedRecords.add(changelogRow("+I", "Euro", 100L, "2022-01-02"));
+        expectedRecords.add(changelogRow("+I", "Yen", 1L, "2022-01-02"));
+        collectAndCheck(tEnv, managedTable, Collections.emptyMap(), expectedRecords).close();
     }
 
-    @Override
-    public void after() {
-        tEnv.executeSql("DROP TABLE `source_table`");
-        super.after();
+    @Test
+    public void testBatchWriteWithPartitionedRecordsWithoutPk() throws Exception {
+        String managedTable = prepareEnvAndWrite(false, false, true, false);
+
+        // input is dailyRates()
+        List<Row> expectedRecords = dailyRates();
+
+        // test batch read
+        collectAndCheck(tEnv, managedTable, Collections.emptyMap(), expectedRecords).close();
+        checkFileStorePath(tEnv, managedTable);
+
+        // test batch read with latest-scan
+        collectAndCheck(
+                        tEnv,
+                        managedTable,
+                        Collections.singletonMap(
+                                LogOptions.SCAN.key(),
+                                LogOptions.LogStartupMode.LATEST.name().toLowerCase()),
+                        Collections.emptyList())
+                .close();
+
+        // overwrite dynamic partition
+        prepareEnvAndOverwrite(
+                managedTable,
+                Collections.emptyMap(),
+                Arrays.asList(
+                        new String[] {"'Euro'", "90", "'2022-01-01'"},
+                        new String[] {"'Yen'", "2", "'2022-01-02'"}));
+
+        // test streaming read
+        final StreamTableEnvironment streamTableEnv =
+                StreamTableEnvironment.create(buildStreamEnv());
+        registerTable(streamTableEnv, managedTable);
+        collectAndCheck(
+                        streamTableEnv,
+                        managedTable,
+                        Collections.emptyMap(),
+                        Arrays.asList(
+                                // part = 2022-01-01
+                                changelogRow("+I", "Euro", 90L, "2022-01-01"),
+                                // part = 2022-01-02
+                                changelogRow("+I", "Yen", 2L, "2022-01-02")))
+                .close();
     }
 
     @Test
-    public void testReadWriteNonPartitioned() throws Exception {
-        String statement =
-                String.format("INSERT INTO %s \nSELECT * FROM `source_table`", tableIdentifier);
-        if (expectedResult.success) {
-            tEnv.executeSql(statement).await();
-            TableResult result =
-                    tEnv.executeSql(String.format("SELECT * FROM %s", tableIdentifier));
-            List<Row> actual = new ArrayList<>();
-            try (CloseableIterator<Row> iterator = result.collect()) {
-                while (iterator.hasNext()) {
-                    actual.add(iterator.next());
-                }
-            }
-            assertThat(actual).containsExactlyInAnyOrderElementsOf(expectedResult.expectedRecords);
-            String relativeFilePath = FileStoreOptions.relativeTablePath(tableIdentifier);
-            // check snapshot file path
-            assertThat(Paths.get(rootPath, relativeFilePath, "snapshot")).exists();
-            // check manifest file path
-            assertThat(Paths.get(rootPath, relativeFilePath, "manifest")).exists();
-
-            if (enableLogStore) {
-                assertThat(topicExists(tableIdentifier.asSummaryString())).isTrue();
-            }
+    public void testBatchWriteWithNonPartitionedRecordsWithPk() throws Exception {
+        String managedTable = prepareEnvAndWrite(false, false, false, true);
+
+        // input is rates()
+        List<Row> expectedRecords =
+                Arrays.asList(
+                        changelogRow("+I", "US Dollar", 102L),
+                        changelogRow("+I", "Yen", 1L),
+                        changelogRow("+I", "Euro", 119L));
+        collectAndCheck(tEnv, managedTable, Collections.emptyMap(), expectedRecords).close();
+        checkFileStorePath(tEnv, managedTable);
+
+        // overwrite the whole table
+        prepareEnvAndOverwrite(
+                managedTable,
+                Collections.emptyMap(),
+                Collections.singletonList(new String[] {"'Euro'", "100"}));
+        expectedRecords = new ArrayList<>(expectedRecords);
+        expectedRecords.clear();
+        expectedRecords.add(changelogRow("+I", "Euro", 100L));
+        collectAndCheck(tEnv, managedTable, Collections.emptyMap(), expectedRecords).close();
+    }
+
+    @Test
+    public void testBatchWriteNonPartitionedRecordsWithoutPk() throws Exception {
+        String managedTable = prepareEnvAndWrite(false, false, false, false);
+
+        // input is rates()
+        List<Row> expectedRecords = rates();
+        collectAndCheck(tEnv, managedTable, Collections.emptyMap(), expectedRecords).close();
+        checkFileStorePath(tEnv, managedTable);
+    }
+
+    @Ignore("changelog case is failed")
+    @Test
+    public void testEnableLogAndStreamingReadWritePartitionedRecordsWithPk() throws Exception {
+        String managedTable = prepareEnvAndWrite(true, true, true, true);
+
+        // input is dailyRatesChangelogWithoutUB()
+        // test hybrid read
+        List<Row> expectedRecords =
+                Arrays.asList(
+                        // part = 2022-01-01
+                        changelogRow("+I", "US Dollar", 102L, "2022-01-01"),
+                        // part = 2022-01-02
+                        changelogRow("+I", "Euro", 119L, "2022-01-02"));
+        BlockingIterator<Row, Row> streamIter =
+                collectAndCheck(tEnv, managedTable, Collections.emptyMap(), expectedRecords);
+        checkFileStorePath(tEnv, managedTable);
+
+        // overwrite partition 2022-01-02
+        prepareEnvAndOverwrite(
+                managedTable,
+                Collections.singletonMap("dt", "'2022-01-02'"),
+                Arrays.asList(new String[] {"'Euro'", "100"}, new String[] {"'Yen'", "1"}));
+
+        // batch read to check data refresh
+        final StreamTableEnvironment batchTableEnv =
+                StreamTableEnvironment.create(buildBatchEnv(), EnvironmentSettings.inBatchMode());
+        registerTable(batchTableEnv, managedTable);
+        collectAndCheck(
+                batchTableEnv,
+                managedTable,
+                Collections.emptyMap(),
+                Arrays.asList(
+                        // part = 2022-01-01
+                        changelogRow("+I", "US Dollar", 102L, "2022-01-01"),
+                        // part = 2022-01-02
+                        changelogRow("+I", "Euro", 100L, "2022-01-02"),
+                        changelogRow("+I", "Yen", 1L, "2022-01-02")));
+
+        // check no changelog generated for streaming read
+        assertNoMoreRecords(streamIter);
+    }
+
+    @Ignore("file store continuous read is failed, actual has size 1")
+    @Test
+    public void testDisableLogAndStreamingReadWritePartitionedRecordsWithPk() throws Exception {
+        String managedTable = prepareEnvAndWrite(true, false, true, true);
+
+        // disable log store and read from latest
+        collectAndCheck(
+                        tEnv,
+                        managedTable,
+                        Collections.singletonMap(
+                                LogOptions.SCAN.key(),
+                                LogOptions.LogStartupMode.LATEST.name().toLowerCase()),
+                        Collections.emptyList())
+                .close();
+
+        // input is dailyRatesChangelogWithoutUB()
+        // file store continuous read
+        // will not merge, at least collect two records
+        List<Row> expectedRecords =
+                Arrays.asList(
+                        // part = 2022-01-01
+                        changelogRow("+I", "US Dollar", 102L, "2022-01-01"),
+                        // part = 2022-01-02
+                        changelogRow("+I", "Euro", 119L, "2022-01-02"));
+        collectAndCheck(tEnv, managedTable, Collections.emptyMap(), expectedRecords).close();
+    }
+
+    @Test
+    public void testStreamingReadWritePartitionedRecordsWithoutPk() throws Exception {
+        String managedTable = prepareEnvAndWrite(true, true, true, false);
+
+        // input is dailyRatesChangelogWithUB()
+        // enable log store, file store bounded read with merge
+        List<Row> expectedRecords =
+                Arrays.asList(
+                        // part = 2022-01-01
+                        changelogRow("+I", "US Dollar", 102L, "2022-01-01"),
+                        // part = 2022-01-02
+                        changelogRow("+I", "Euro", 115L, "2022-01-02"));
+        collectAndCheck(tEnv, managedTable, Collections.emptyMap(), expectedRecords).close();
+    }
+
+    @Test
+    public void testStreamingReadWriteNonPartitionedRecordsWithPk() throws Exception {
+        String managedTable = prepareEnvAndWrite(true, true, false, true);
+
+        // input is ratesChangelogWithoutUB()
+        // enable log store, file store bounded read with merge
+        List<Row> expectedRecords =
+                Arrays.asList(
+                        changelogRow("+I", "US Dollar", 102L), changelogRow("+I", "Euro", 119L));
+        collectAndCheck(tEnv, managedTable, Collections.emptyMap(), expectedRecords).close();
+    }
+
+    @Test
+    public void testStreamingReadWriteNonPartitionedRecordsWithoutPk() throws Exception {
+        String managedTable = prepareEnvAndWrite(true, true, false, false);
+
+        // input is ratesChangelogWithUB()
+        // enable log store, with default full scan mode, will merge
+        List<Row> expectedRecords =
+                Arrays.asList(
+                        changelogRow("+I", "US Dollar", 102L), changelogRow("+I", "Euro", 119L));
+
+        collectAndCheck(tEnv, managedTable, Collections.emptyMap(), expectedRecords).close();
+    }
+
+    // ------------------------ Tools ----------------------------------
+
+    private String prepareEnvAndWrite(
+            boolean streaming, boolean enableLogStore, boolean partitioned, boolean hasPk)
+            throws Exception {
+        Map<String, String> tableOptions = new HashMap<>();
+        rootPath = TEMPORARY_FOLDER.newFolder().getPath();
+        tableOptions.put(FileStoreOptions.FILE_PATH.key(), rootPath);
+        if (enableLogStore) {
+            tableOptions.put(LOG_SYSTEM.key(), "kafka");
+            tableOptions.put(LOG_PREFIX + BOOTSTRAP_SERVERS.key(), getBootstrapServers());
+        }
+        String sourceTable = "source_table_" + UUID.randomUUID();
+        String managedTable = "managed_table_" + UUID.randomUUID();
+        EnvironmentSettings.Builder builder = EnvironmentSettings.newInstance().inStreamingMode();
+        String helperTableDdl;
+        if (streaming) {
+            helperTableDdl =
+                    prepareHelperSourceWithChangelogRecords(sourceTable, partitioned, hasPk);
+            env = buildStreamEnv();
+            builder.inStreamingMode();
         } else {
-            assertThatThrownBy(
-                            () -> {
-                                tEnv.executeSql(statement).await();
-                                tEnv.executeSql(String.format("SELECT * FROM %s", tableIdentifier))
-                                        .collect();
-                            })
-                    .isInstanceOf(expectedResult.expectedType)
-                    .hasMessageContaining(expectedResult.expectedMessage);
+            helperTableDdl =
+                    prepareHelperSourceWithInsertOnlyRecords(sourceTable, partitioned, hasPk);
+            env = buildBatchEnv();
+            builder.inBatchMode();
         }
+        String managedTableDdl = prepareManagedTableDdl(sourceTable, managedTable, tableOptions);
+        String insertQuery = prepareInsertIntoQuery(sourceTable, managedTable);
+
+        tEnv = StreamTableEnvironment.create(env, builder.build());
+        tEnv.executeSql(helperTableDdl);
+        tEnv.executeSql(managedTableDdl);
+        tEnv.executeSql(insertQuery).await();
+        return managedTable;
     }
 
-    @Parameterized.Parameters(
-            name =
-                    "executionMode-{0}, tableName-{1}, "
-                            + "enableLogStore-{2}, hasPk-{3},"
-                            + " duplicate-{4}, expectedResult-{5}")
-    public static List<Object[]> data() {
-        List<Object[]> specs = new ArrayList<>();
-        // batch cases
-        specs.add(
-                new Object[] {
-                    RuntimeExecutionMode.BATCH,
-                    "table_" + UUID.randomUUID(),
-                    false, // disable log store
-                    false, // no pk
-                    false, // without duplicate
-                    new ExpectedResult().success(true).expectedRecords(insertOnlyCities(false))
-                });
-        specs.add(
-                new Object[] {
-                    RuntimeExecutionMode.BATCH,
-                    "table_" + UUID.randomUUID(),
-                    false, // disable log store
-                    false, // no pk
-                    true, //  with duplicate
-                    new ExpectedResult().success(true).expectedRecords(insertOnlyCities(true))
-                });
-        List<Row> expected = new ArrayList<>(rates());
-        expected.remove(1);
-        specs.add(
-                new Object[] {
-                    RuntimeExecutionMode.BATCH,
-                    "table_" + UUID.randomUUID(),
-                    false, // disable log store
-                    true, // has pk
-                    null, // without duplicate
-                    new ExpectedResult().success(true).expectedRecords(expected)
-                });
-        // TODO: streaming with log system
+    private void prepareEnvAndOverwrite(
+            String managedTable,
+            Map<String, String> staticPartitions,
+            List<String[]> overwriteRecords)
+            throws Exception {
+        final StreamTableEnvironment batchEnv =
+                StreamTableEnvironment.create(buildBatchEnv(), EnvironmentSettings.inBatchMode());
+        registerTable(batchEnv, managedTable);
+        String insertQuery =
+                prepareInsertOverwriteQuery(managedTable, staticPartitions, overwriteRecords);
+        batchEnv.executeSql(insertQuery).await();
+    }
 
-        // TODO: add overwrite case
+    private void registerTable(StreamTableEnvironment tEnvToRegister, String managedTable)
+            throws Exception {
+        String cat = this.tEnv.getCurrentCatalog();
+        String db = this.tEnv.getCurrentDatabase();
+        ObjectPath objectPath = new ObjectPath(db, managedTable);
+        CatalogBaseTable table = this.tEnv.getCatalog(cat).get().getTable(objectPath);
+        tEnvToRegister.getCatalog(cat).get().createTable(objectPath, table, false);
+    }
 
-        return specs;
+    private BlockingIterator<Row, Row> collect(
+            StreamTableEnvironment tEnv, String selectQuery, int expectedSize, List<Row> actual)
+            throws Exception {
+        TableResult result = tEnv.executeSql(selectQuery);
+        BlockingIterator<Row, Row> iterator = BlockingIterator.of(result.collect());
+        actual.addAll(iterator.collect(expectedSize));
+        return iterator;
     }
 
-    @Override
-    protected void prepareEnv() {
-        if (hasPk) {
-            if (executionMode == RuntimeExecutionMode.STREAMING) {
-                registerUpsertRecordsWithPk();
-            } else {
-                registerInsertOnlyRecordsWithPk();
-            }
-        } else {
-            if (duplicate != null) {
-                registerInsertOnlyRecordsWithoutPk(duplicate);
-            } else {
-                registerInsertUpdateDeleteRecordsWithoutPk();
-            }
+    private BlockingIterator<Row, Row> collectAndCheck(
+            StreamTableEnvironment tEnv,
+            String managedTable,
+            Map<String, String> hints,
+            List<Row> expectedRecords)
+            throws Exception {
+        String selectQuery = prepareSimpleSelectQuery(managedTable, hints);
+        List<Row> actual = new ArrayList<>();
+        BlockingIterator<Row, Row> iterator =
+                collect(tEnv, selectQuery, expectedRecords.size(), actual);
+
+        assertThat(actual).containsExactlyInAnyOrderElementsOf(expectedRecords);
+        return iterator;
+    }
+
+    private void checkFileStorePath(StreamTableEnvironment tEnv, String managedTable) {
+        String relativeFilePath =
+                FileStoreOptions.relativeTablePath(
+                        ObjectIdentifier.of(
+                                tEnv.getCurrentCatalog(), tEnv.getCurrentDatabase(), managedTable));
+        // check snapshot file path
+        assertThat(Paths.get(rootPath, relativeFilePath, "snapshot")).exists();
+        // check manifest file path
+        assertThat(Paths.get(rootPath, relativeFilePath, "manifest")).exists();
+    }
+
+    private static void assertNoMoreRecords(BlockingIterator<Row, Row> iterator) {
+        List<Row> expectedRecords = Collections.emptyList();
+        try {
+            // set expectation size to 1 to let time pass by until timeout
+            expectedRecords = iterator.collect(1, 1L, TimeUnit.MINUTES);
+            iterator.close();
+        } catch (Exception ignored) {
+            // don't throw exception
         }
+        assertThat(expectedRecords).isEmpty();
     }
 
-    private void registerInsertUpdateDeleteRecordsWithoutPk() {
-        tEnv.executeSql(
-                String.format(
-                        "CREATE TABLE source_table (\n"
-                                + "  user_id STRING,\n"
-                                + "  user_name STRING,\n"
-                                + "  email STRING,\n"
-                                + "  balance DECIMAL(18,2)\n"
-                                + ") WITH (\n"
-                                + " 'connector' = 'values',\n"
-                                + " 'bounded' = '%s',\n"
-                                + " 'data-id' = '%s',\n"
-                                + " 'changelog-mode' = 'I,UA,UB,D',\n"
-                                + " 'disable-lookup' = 'true'\n"
-                                + ")",
-                        executionMode == RuntimeExecutionMode.BATCH,
-                        registerData(TestData.userChangelog())));
-        registerTableStoreSink();
+    private static String prepareManagedTableDdl(
+            String sourceTableName, String managedTableName, Map<String, String> tableOptions) {
+        StringBuilder ddl =
+                new StringBuilder("CREATE TABLE IF NOT EXISTS ")
+                        .append(String.format("`%s`", managedTableName));
+        if (tableOptions.size() > 0) {
+            ddl.append(" WITH (\n");
+            tableOptions.forEach(
+                    (k, v) ->
+                            ddl.append("  ")
+                                    .append(String.format("'%s'", k))
+                                    .append(" = ")
+                                    .append(String.format("'%s',\n", v)));
+            int len = ddl.length();
+            ddl.delete(len - 2, len);
+            ddl.append(")");
+        }
+        ddl.append(String.format(" LIKE `%s` (EXCLUDING OPTIONS)\n", sourceTableName));
+        return ddl.toString();
     }
 
-    private void registerInsertOnlyRecordsWithoutPk(boolean duplicate) {
-        tEnv.executeSql(
-                String.format(
-                        "CREATE TABLE source_table (\n"
-                                + "  name STRING NOT NULL,\n"
-                                + "  state STRING NOT NULL,\n"
-                                + "  pop INT NOT NULL\n"
-                                + ") WITH (\n"
-                                + " 'connector' = 'values',\n"
-                                + " 'bounded' = '%s',\n"
-                                + " 'data-id' = '%s',\n"
-                                + " 'changelog-mode' = 'I'\n"
-                                + ")",
-                        executionMode == RuntimeExecutionMode.BATCH,
-                        registerData(insertOnlyCities(duplicate))));
-        registerTableStoreSink();
+    private static String prepareInsertIntoQuery(String sourceTableName, String managedTableName) {
+        return prepareInsertIntoQuery(
+                sourceTableName, managedTableName, Collections.emptyMap(), Collections.emptyMap());
     }
 
-    private void registerInsertOnlyRecordsWithPk() {
-        tEnv.executeSql(
-                String.format(
-                        "CREATE TABLE source_table (\n"
-                                + "  currency STRING,\n"
-                                + "  rate BIGINT,\n"
-                                + "  PRIMARY KEY (currency) NOT ENFORCED\n"
-                                + ") WITH (\n"
-                                + "  'connector' = 'values',\n"
-                                + " 'bounded' = '%s',\n"
-                                + "  'data-id' = '%s',\n"
-                                + "  'changelog-mode' = 'I',\n"
-                                + "  'disable-lookup' = 'true'\n"
-                                + ")",
-                        executionMode == RuntimeExecutionMode.BATCH, registerData(rates())));
-        registerTableStoreSink();
+    private static String prepareInsertIntoQuery(
+            String sourceTableName,
+            String managedTableName,
+            Map<String, String> partitions,
+            Map<String, String> hints) {
+        StringBuilder insertDmlBuilder =
+                new StringBuilder(String.format("INSERT INTO `%s`", managedTableName));
+        if (partitions.size() > 0) {
+            insertDmlBuilder.append(" PARTITION (");
+            partitions.forEach(
+                    (k, v) -> {
+                        insertDmlBuilder.append(String.format("'%s'", k));
+                        insertDmlBuilder.append(" = ");
+                        insertDmlBuilder.append(String.format("'%s', ", v));
+                    });
+            int len = insertDmlBuilder.length();
+            insertDmlBuilder.deleteCharAt(len - 1);
+            insertDmlBuilder.append(")");
+        }
+        insertDmlBuilder.append(String.format("\n SELECT * FROM `%s`", sourceTableName));
+        insertDmlBuilder.append(buildHints(hints));
+
+        return insertDmlBuilder.toString();
+    }
+
+    private static String prepareInsertOverwriteQuery(
+            String managedTableName,
+            Map<String, String> staticPartitions,
+            List<String[]> overwriteRecords) {
+        StringBuilder insertDmlBuilder =
+                new StringBuilder(String.format("INSERT OVERWRITE `%s`", managedTableName));
+        if (staticPartitions.size() > 0) {
+            insertDmlBuilder.append(" PARTITION (");
+            staticPartitions.forEach(
+                    (k, v) -> {
+                        insertDmlBuilder.append(String.format("%s", k));
+                        insertDmlBuilder.append(" = ");
+                        insertDmlBuilder.append(String.format("%s, ", v));
+                    });
+            int len = insertDmlBuilder.length();
+            insertDmlBuilder.delete(len - 2, len);
+            insertDmlBuilder.append(")");
+        }
+        insertDmlBuilder.append("\n VALUES ");
+        overwriteRecords.forEach(
+                record -> {
+                    int arity = record.length;
+                    insertDmlBuilder.append("(");
+                    IntStream.range(0, arity)
+                            .forEach(i -> insertDmlBuilder.append(record[i]).append(", "));
+
+                    if (arity > 0) {
+                        int len = insertDmlBuilder.length();
+                        insertDmlBuilder.delete(len - 2, len);
+                    }
+                    insertDmlBuilder.append("), ");
+                });
+        int len = insertDmlBuilder.length();
+        insertDmlBuilder.delete(len - 2, len);
+        return insertDmlBuilder.toString();
+    }
+
+    private static String prepareSimpleSelectQuery(String tableName, Map<String, String> hints) {
+        return String.format("SELECT * FROM `%s` %s", tableName, buildHints(hints));
+    }
+
+    private static String buildHints(Map<String, String> hints) {
+        if (hints.size() > 0) {
+            StringBuilder hintsBuilder = new StringBuilder("/* + OPTIONS (");
+            hints.forEach(
+                    (k, v) -> {
+                        hintsBuilder.append(String.format("'%s'", k));
+                        hintsBuilder.append(" = ");
+                        hintsBuilder.append(String.format("'%s', ", v));
+                    });
+            int len = hintsBuilder.length();
+            hintsBuilder.deleteCharAt(len - 1);
+            hintsBuilder.append(") */");
+            return hintsBuilder.toString();
+        }
+        return "";
+    }
+
+    private static String prepareHelperSourceWithInsertOnlyRecords(
+            String sourceTable, boolean partitioned, boolean hasPk) {
+        return prepareHelperSourceRecords(
+                RuntimeExecutionMode.BATCH, sourceTable, partitioned, hasPk);
+    }
+
+    private static String prepareHelperSourceWithChangelogRecords(
+            String sourceTable, boolean partitioned, boolean hasPk) {
+        return prepareHelperSourceRecords(
+                RuntimeExecutionMode.STREAMING, sourceTable, partitioned, hasPk);
     }
 
-    private void registerUpsertRecordsWithPk() {
-        tEnv.executeSql(
+    /**
+     * Prepare helper source table ddl according to different input parameter.
+     *
+     * <pre> E.g. pk with partition
+     *   {@code
+     *   CREATE TABLE source_table (
+     *     currency STRING,
+     *     rate BIGINT,
+     *     dt STRING) PARTITIONED BY (dt)
+     *    WITH (
+     *      'connector' = 'values',
+     *      'bounded' = executionMode == RuntimeExecutionMode.BATCH,
+     *      'partition-list' = '...'
+     *     )
+     *   }
+     * </pre>
+     *
+     * @param executionMode is used to calculate {@code bounded}
+     * @param sourceTable source table name
+     * @param partitioned is used to calculate {@code partition-list}
+     * @param hasPk
+     * @return helper source ddl
+     */
+    private static String prepareHelperSourceRecords(
+            RuntimeExecutionMode executionMode,
+            String sourceTable,
+            boolean partitioned,
+            boolean hasPk) {
+        boolean bounded = executionMode == RuntimeExecutionMode.BATCH;
+        String changelogMode = bounded ? "I" : hasPk ? "I,UA,D" : "I,UA,UB,D";
+        StringBuilder ddlBuilder =
+                new StringBuilder(String.format("CREATE TABLE `%s` (\n", sourceTable))
+                        .append("  currency STRING,\n")
+                        .append("  rate BIGINT");
+        if (partitioned) {
+            ddlBuilder.append(",\n dt STRING");
+        }
+        if (hasPk) {
+            ddlBuilder.append(", \n PRIMARY KEY (currency");
+            if (partitioned) {
+                ddlBuilder.append(", dt");
+            }
+            ddlBuilder.append(") NOT ENFORCED\n");
+        } else {
+            ddlBuilder.append("\n");
+        }
+        ddlBuilder.append(")");
+        if (partitioned) {
+            ddlBuilder.append(" PARTITIONED BY (dt)\n");
+        }
+        List<Row> input;
+        if (bounded) {
+            input = partitioned ? dailyRates() : rates();
+        } else {
+            if (hasPk) {
+                input = partitioned ? dailyRatesChangelogWithoutUB() : ratesChangelogWithoutUB();
+            } else {
+                input = partitioned ? dailyRatesChangelogWithUB() : ratesChangelogWithUB();
+            }
+        }
+        ddlBuilder.append(
                 String.format(
-                        "CREATE TABLE source_table (\n"
-                                + "  currency STRING,\n"
-                                + "  rate BIGINT,\n"
-                                + "  PRIMARY KEY (currency) NOT ENFORCED\n"
-                                + ") WITH (\n"
+                        "  WITH (\n"
                                 + "  'connector' = 'values',\n"
-                                + " 'bounded' = '%s',\n"
+                                + "  'bounded' = '%s',\n"
                                 + "  'data-id' = '%s',\n"
-                                + "  'changelog-mode' = 'UA,D',\n"
-                                + "  'disable-lookup' = 'true'\n"
+                                + "  'changelog-mode' = '%s',\n"
+                                + "  'disable-lookup' = 'true',\n"
+                                + "  'partition-list' = '%s'\n"
                                 + ")",
-                        executionMode == RuntimeExecutionMode.BATCH,
-                        registerData(ratesChangelog())));
-        registerTableStoreSink();
+                        bounded,
+                        registerData(input),
+                        changelogMode,
+                        partitioned ? "dt:2022-01-01;dt:2022-01-02" : ""));
+        return ddlBuilder.toString();
     }
 
     private static List<Row> rates() {
@@ -249,10 +597,22 @@ public class ReadWriteTableITCase extends TableStoreTestBase {
                 changelogRow("+I", "US Dollar", 102L),
                 changelogRow("+I", "Euro", 114L),
                 changelogRow("+I", "Yen", 1L),
+                changelogRow("+I", "Euro", 114L),
                 changelogRow("+I", "Euro", 119L));
     }
 
-    private static List<Row> ratesChangelog() {
+    private static List<Row> dailyRates() {
+        return Arrays.asList(
+                // part = 2022-01-01
+                changelogRow("+I", "US Dollar", 102L, "2022-01-01"),
+                changelogRow("+I", "Euro", 114L, "2022-01-01"),
+                changelogRow("+I", "Yen", 1L, "2022-01-01"),
+                changelogRow("+I", "Euro", 114L, "2022-01-01"),
+                // part = 2022-01-02
+                changelogRow("+I", "Euro", 119L, "2022-01-02"));
+    }
+
+    static List<Row> ratesChangelogWithoutUB() {
         return Arrays.asList(
                 changelogRow("+I", "US Dollar", 102L),
                 changelogRow("+I", "Euro", 114L),
@@ -264,24 +624,62 @@ public class ReadWriteTableITCase extends TableStoreTestBase {
                 changelogRow("-D", "Yen", 1L));
     }
 
-    private static List<Row> insertOnlyCities(boolean duplicate) {
-        List<Row> cities = JavaConverters.seqAsJavaList(TestData.citiesData());
-        return duplicate
-                ? Stream.concat(cities.stream(), cities.stream()).collect(Collectors.toList())
-                : cities;
+    static List<Row> dailyRatesChangelogWithoutUB() {
+        return Arrays.asList(
+                // part = 2022-01-01
+                changelogRow("+I", "US Dollar", 102L, "2022-01-01"),
+                changelogRow("+I", "Euro", 114L, "2022-01-01"),
+                changelogRow("+I", "Yen", 1L, "2022-01-01"),
+                changelogRow("+U", "Euro", 116L, "2022-01-01"),
+                changelogRow("-D", "Yen", 1L, "2022-01-01"),
+                changelogRow("-D", "Euro", 116L, "2022-01-01"),
+                // part = 2022-01-02
+                changelogRow("+I", "Euro", 119L, "2022-01-02"),
+                changelogRow("+U", "Euro", 119L, "2022-01-02"));
     }
 
-    private static List<Row> userChangelog() {
+    private static List<Row> ratesChangelogWithUB() {
         return Arrays.asList(
-                changelogRow("+I", "user1", "Tom", "tom123@gmail.com", new BigDecimal("8.10")),
-                changelogRow("+I", "user3", "Bailey", "bailey@qq.com", new BigDecimal("9.99")),
-                changelogRow("+I", "user4", "Tina", "tina@gmail.com", new BigDecimal("11.30")));
+                changelogRow("+I", "US Dollar", 102L),
+                changelogRow("+I", "Euro", 114L),
+                changelogRow("+I", "Yen", 1L),
+                changelogRow("-U", "Euro", 114L),
+                changelogRow("+U", "Euro", 116L),
+                changelogRow("-D", "Euro", 116L),
+                changelogRow("+I", "Euro", 119L),
+                changelogRow("-U", "Euro", 119L),
+                changelogRow("+U", "Euro", 119L),
+                changelogRow("-D", "Yen", 1L));
     }
 
-    private void registerTableStoreSink() {
-        tEnv.executeSql(
-                String.format(
-                        "CREATE TABLE %s LIKE `source_table` (EXCLUDING OPTIONS)",
-                        tableIdentifier.asSerializableString()));
+    private static List<Row> dailyRatesChangelogWithUB() {
+        return Arrays.asList(
+                // part = 2022-01-01
+                changelogRow("+I", "US Dollar", 102L, "2022-01-01"),
+                changelogRow("+I", "Euro", 116L, "2022-01-01"),
+                changelogRow("-D", "Euro", 116L, "2022-01-01"),
+                changelogRow("+I", "Yen", 1L, "2022-01-01"),
+                changelogRow("-D", "Yen", 1L, "2022-01-01"),
+                // part = 2022-01-02
+                changelogRow("+I", "Euro", 114L, "2022-01-02"),
+                changelogRow("-U", "Euro", 114L, "2022-01-02"),
+                changelogRow("+U", "Euro", 119L, "2022-01-02"),
+                changelogRow("-D", "Euro", 119L, "2022-01-02"),
+                changelogRow("+I", "Euro", 115L, "2022-01-02"));
+    }
+
+    private static StreamExecutionEnvironment buildStreamEnv() {
+        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+        env.enableCheckpointing(100);
+        env.setParallelism(2);
+        return env;
+    }
+
+    private static StreamExecutionEnvironment buildBatchEnv() {
+        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+        env.setParallelism(2);
+        return env;
     }
 }
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
index 0fe91af..191a197 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
@@ -154,6 +154,7 @@ public abstract class TableStoreTestBase extends KafkaTableTestBase {
     protected static class ExpectedResult {
         protected boolean success;
         protected List<Row> expectedRecords;
+        protected boolean failureHasCause;
         protected Class<? extends Throwable> expectedType;
         protected String expectedMessage;
 
@@ -167,6 +168,11 @@ public abstract class TableStoreTestBase extends KafkaTableTestBase {
             return this;
         }
 
+        ExpectedResult failureHasCause(boolean failureHasCause) {
+            this.failureHasCause = failureHasCause;
+            return this;
+        }
+
         ExpectedResult expectedType(Class<? extends Throwable> exceptionClazz) {
             this.expectedType = exceptionClazz;
             return this;
@@ -184,6 +190,8 @@ public abstract class TableStoreTestBase extends KafkaTableTestBase {
                     + success
                     + ", expectedRecords="
                     + expectedRecords
+                    + ", failureHasCause="
+                    + failureHasCause
                     + ", expectedType="
                     + expectedType
                     + ", expectedMessage='"
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
index e2ef3e7..de48269 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
@@ -122,6 +122,7 @@ public class StoreSinkTest {
                         fileStore,
                         partitions,
                         primaryKeys,
+                        primaryKeys,
                         2,
                         () -> lock,
                         new HashMap<>(),
@@ -252,6 +253,7 @@ public class StoreSinkTest {
                 fileStore,
                 partitions,
                 primaryKeys,
+                primaryKeys,
                 2,
                 () -> lock,
                 overwritePartition,
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecordConverter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecordConverter.java
index 2280418..112f772 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecordConverter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecordConverter.java
@@ -25,6 +25,9 @@ import org.apache.flink.table.store.codegen.CodeGenUtils;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.RowKind;
 
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
 import java.util.stream.IntStream;
 
 /** Converter for converting {@link RowData} to {@link SinkRecord}. */
@@ -38,14 +41,24 @@ public class SinkRecordConverter {
 
     private final Projection<RowData, BinaryRowData> pkProjection;
 
+    @Nullable private final Projection<RowData, BinaryRowData> logPkProjection;
+
     public SinkRecordConverter(
-            int numBucket, RowType inputType, int[] partitions, int[] primaryKeys) {
+            int numBucket,
+            RowType inputType,
+            int[] partitions,
+            int[] primaryKeys,
+            int[] logPrimaryKeys) {
         this.numBucket = numBucket;
         this.allProjection =
                 CodeGenUtils.newProjection(
                         inputType, IntStream.range(0, inputType.getFieldCount()).toArray());
         this.partProjection = CodeGenUtils.newProjection(inputType, partitions);
         this.pkProjection = CodeGenUtils.newProjection(inputType, primaryKeys);
+        this.logPkProjection =
+                Arrays.equals(primaryKeys, logPrimaryKeys)
+                        ? null
+                        : CodeGenUtils.newProjection(inputType, logPrimaryKeys);
     }
 
     public SinkRecord convert(RowData row) {
@@ -55,10 +68,23 @@ public class SinkRecordConverter {
         return new SinkRecord(partition, bucket, primaryKey, row);
     }
 
+    public SinkRecord convertToLogSinkRecord(SinkRecord record) {
+        if (logPkProjection == null) {
+            return record;
+        }
+        BinaryRowData logPrimaryKey = logPrimaryKey(record.row());
+        return new SinkRecord(record.partition(), record.bucket(), logPrimaryKey, record.row());
+    }
+
     public BinaryRowData primaryKey(RowData row) {
         return pkProjection.apply(row);
     }
 
+    private BinaryRowData logPrimaryKey(RowData row) {
+        assert logPkProjection != null;
+        return logPkProjection.apply(row);
+    }
+
     public int bucket(RowData row, BinaryRowData primaryKey) {
         int hash = primaryKey.getArity() == 0 ? hashRow(row) : primaryKey.hashCode();
         return Math.abs(hash % numBucket);
diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSinkProvider.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSinkProvider.java
index 85b9cc0..e084bb9 100644
--- a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSinkProvider.java
+++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSinkProvider.java
@@ -46,7 +46,7 @@ public class KafkaLogSinkProvider implements LogSinkProvider {
 
     private final Properties properties;
 
-    @Nullable private final SerializationSchema<RowData> keySerializer;
+    @Nullable private final SerializationSchema<RowData> primaryKeySerializer;
 
     private final SerializationSchema<RowData> valueSerializer;
 
@@ -57,13 +57,13 @@ public class KafkaLogSinkProvider implements LogSinkProvider {
     public KafkaLogSinkProvider(
             String topic,
             Properties properties,
-            @Nullable SerializationSchema<RowData> keySerializer,
+            @Nullable SerializationSchema<RowData> primaryKeySerializer,
             SerializationSchema<RowData> valueSerializer,
             LogConsistency consistency,
             LogChangelogMode changelogMode) {
         this.topic = topic;
         this.properties = properties;
-        this.keySerializer = keySerializer;
+        this.primaryKeySerializer = primaryKeySerializer;
         this.valueSerializer = valueSerializer;
         this.consistency = consistency;
         this.changelogMode = changelogMode;
@@ -78,7 +78,7 @@ public class KafkaLogSinkProvider implements LogSinkProvider {
                         .setTransactionalIdPrefix("log-store-" + topic);
                 break;
             case EVENTUAL:
-                if (keySerializer == null) {
+                if (primaryKeySerializer == null) {
                     throw new IllegalArgumentException(
                             "Can not use EVENTUAL consistency mode for non-pk table.");
                 }
@@ -101,6 +101,6 @@ public class KafkaLogSinkProvider implements LogSinkProvider {
     @VisibleForTesting
     KafkaLogSerializationSchema createSerializationSchema() {
         return new KafkaLogSerializationSchema(
-                topic, keySerializer, valueSerializer, changelogMode);
+                topic, primaryKeySerializer, valueSerializer, changelogMode);
     }
 }
diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
index 58386fb..a5e787b 100644
--- a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
+++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
@@ -197,11 +197,11 @@ public class KafkaLogStoreFactory implements LogStoreTableFactory {
         FactoryUtil.TableFactoryHelper helper = createTableFactoryHelper(this, context);
         ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
         DataType physicalType = schema.toPhysicalRowDataType();
-        SerializationSchema<RowData> keySerializer = null;
+        SerializationSchema<RowData> primaryKeySerializer = null;
         int[] primaryKey = schema.getPrimaryKeyIndexes();
         if (primaryKey.length > 0) {
             DataType keyType = DataTypeUtils.projectRow(physicalType, primaryKey);
-            keySerializer =
+            primaryKeySerializer =
                     LogStoreTableFactory.getKeyEncodingFormat(helper)
                             .createRuntimeEncoder(sinkContext, keyType);
         }
@@ -211,7 +211,7 @@ public class KafkaLogStoreFactory implements LogStoreTableFactory {
         return new KafkaLogSinkProvider(
                 topic(context),
                 toKafkaProperties(helper.getOptions()),
-                keySerializer,
+                primaryKeySerializer,
                 valueSerializer,
                 helper.getOptions().get(CONSISTENCY),
                 helper.getOptions().get(CHANGELOG_MODE));