You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/03/25 06:53:36 UTC
[flink-table-store] branch master updated: [FLINK-26669] Refactor ReadWriteTableITCase
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 36f045c [FLINK-26669] Refactor ReadWriteTableITCase
36f045c is described below
commit 36f045c55c2095612ace2637c6f4a03da2837467
Author: Jane Chan <55...@users.noreply.github.com>
AuthorDate: Fri Mar 25 14:53:31 2022 +0800
[FLINK-26669] Refactor ReadWriteTableITCase
This closes #58
---
.../flink/table/store/connector/TableStore.java | 21 +-
.../connector/sink/BucketStreamPartitioner.java | 11 +-
.../table/store/connector/sink/StoreSink.java | 7 +-
.../store/connector/sink/StoreSinkWriter.java | 3 +-
.../store/connector/ReadWriteTableITCase.java | 786 ++++++++++++++++-----
.../table/store/connector/TableStoreTestBase.java | 8 +
.../table/store/connector/sink/StoreSinkTest.java | 2 +
.../table/store/sink/SinkRecordConverter.java | 28 +-
.../table/store/kafka/KafkaLogSinkProvider.java | 10 +-
.../table/store/kafka/KafkaLogStoreFactory.java | 6 +-
10 files changed, 669 insertions(+), 213 deletions(-)
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
index a33c06f..9374933 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
@@ -83,9 +83,12 @@ public class TableStore {
/** partition keys, default no partition. */
private int[] partitions = new int[0];
- /** primary keys, default no key. */
+ /** file store primary keys which exclude partition fields if partitioned, default no key. */
private int[] primaryKeys = new int[0];
+ /** log store primary keys which include partition fields if partitioned, default no key. */
+ private int[] logPrimaryKeys = new int[0];
+
private RowType type;
private ObjectIdentifier tableIdentifier;
@@ -112,6 +115,7 @@ public class TableStore {
public TableStore withPrimaryKeys(int[] primaryKeys) {
this.primaryKeys = primaryKeys;
+ this.logPrimaryKeys = primaryKeys;
adjustIndexAndValidate();
return this;
}
@@ -186,15 +190,16 @@ public class TableStore {
}
private void adjustIndexAndValidate() {
- if (primaryKeys.length > 0 && partitions.length > 0) {
- List<Integer> pkList = Arrays.stream(primaryKeys).boxed().collect(Collectors.toList());
+ if (logPrimaryKeys.length > 0 && partitions.length > 0) {
+ List<Integer> pkList =
+ Arrays.stream(logPrimaryKeys).boxed().collect(Collectors.toList());
List<Integer> partitionList =
Arrays.stream(partitions).boxed().collect(Collectors.toList());
String pkInfo =
type == null
? pkList.toString()
- : TypeUtils.project(type, primaryKeys).getFieldNames().toString();
+ : TypeUtils.project(type, logPrimaryKeys).getFieldNames().toString();
String partitionInfo =
type == null
? partitionList.toString()
@@ -205,7 +210,9 @@ public class TableStore {
"Primary key constraint %s should include all partition fields %s",
pkInfo, partitionInfo));
primaryKeys =
- Arrays.stream(primaryKeys).filter(pk -> !partitionList.contains(pk)).toArray();
+ Arrays.stream(logPrimaryKeys)
+ .filter(pk -> !partitionList.contains(pk))
+ .toArray();
Preconditions.checkState(
primaryKeys.length > 0,
@@ -354,7 +361,8 @@ public class TableStore {
int numBucket = options.get(BUCKET);
BucketStreamPartitioner partitioner =
- new BucketStreamPartitioner(numBucket, type, partitions, primaryKeys);
+ new BucketStreamPartitioner(
+ numBucket, type, partitions, primaryKeys, logPrimaryKeys);
DataStream<RowData> partitioned =
new DataStream<>(
input.getExecutionEnvironment(),
@@ -366,6 +374,7 @@ public class TableStore {
fileStore,
partitions,
primaryKeys,
+ logPrimaryKeys,
numBucket,
lockFactory,
overwritePartition,
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java
index f69ecc0..dedf5d8 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java
@@ -33,22 +33,29 @@ public class BucketStreamPartitioner extends StreamPartitioner<RowData> {
private final RowType inputType;
private final int[] partitions;
private final int[] primaryKeys;
+ private final int[] logPrimaryKeys;
private transient SinkRecordConverter recordConverter;
public BucketStreamPartitioner(
- int numBucket, RowType inputType, int[] partitions, int[] primaryKeys) {
+ int numBucket,
+ RowType inputType,
+ int[] partitions,
+ int[] primaryKeys,
+ final int[] logPrimaryKeys) {
this.numBucket = numBucket;
this.inputType = inputType;
this.partitions = partitions;
this.primaryKeys = primaryKeys;
+ this.logPrimaryKeys = logPrimaryKeys;
}
@Override
public void setup(int numberOfChannels) {
super.setup(numberOfChannels);
this.recordConverter =
- new SinkRecordConverter(numBucket, inputType, partitions, primaryKeys);
+ new SinkRecordConverter(
+ numBucket, inputType, partitions, primaryKeys, logPrimaryKeys);
}
@Override
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
index a848ca6..88cf8b5 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
@@ -63,6 +63,8 @@ public class StoreSink<WriterStateT, LogCommT>
private final int[] primaryKeys;
+ private final int[] logPrimaryKeys;
+
private final int numBucket;
@Nullable private final CatalogLock.Factory lockFactory;
@@ -76,6 +78,7 @@ public class StoreSink<WriterStateT, LogCommT>
FileStore fileStore,
int[] partitions,
int[] primaryKeys,
+ int[] logPrimaryKeys,
int numBucket,
@Nullable CatalogLock.Factory lockFactory,
@Nullable Map<String, String> overwritePartition,
@@ -84,6 +87,7 @@ public class StoreSink<WriterStateT, LogCommT>
this.fileStore = fileStore;
this.partitions = partitions;
this.primaryKeys = primaryKeys;
+ this.logPrimaryKeys = logPrimaryKeys;
this.numBucket = numBucket;
this.lockFactory = lockFactory;
this.overwritePartition = overwritePartition;
@@ -117,7 +121,8 @@ public class StoreSink<WriterStateT, LogCommT>
numBucket,
primaryKeys.length > 0 ? fileStore.valueType() : fileStore.keyType(),
partitions,
- primaryKeys),
+ primaryKeys,
+ logPrimaryKeys),
overwritePartition != null,
logWriter,
logCallback);
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
index 4effdd8..6a568a6 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
@@ -105,8 +105,9 @@ public class StoreSinkWriter<WriterStateT>
throw new IOException(e);
}
- // write to log store
+ // write to log store, need to preserve original pk (which includes partition fields)
if (logWriter != null) {
+ record = recordConverter.convertToLogSinkRecord(record);
logWriter.write(record, context);
}
}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
index 819e018..7b6d648 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
@@ -19,229 +19,577 @@
package org.apache.flink.table.store.connector;
import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
-import org.apache.flink.table.planner.runtime.utils.TestData;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.utils.BlockingIterator;
+import org.apache.flink.table.store.kafka.KafkaTableTestBase;
+import org.apache.flink.table.store.log.LogOptions;
import org.apache.flink.types.Row;
-import org.apache.flink.util.CloseableIterator;
+import org.junit.Ignore;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import javax.annotation.Nullable;
-
-import java.math.BigDecimal;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import scala.collection.JavaConverters;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
import static org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
import static org.apache.flink.table.planner.factories.TestValuesTableFactory.registerData;
+import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.LOG_SYSTEM;
+import static org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
+import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-/** IT cases for testing querying managed table dml. */
-@RunWith(Parameterized.class)
-public class ReadWriteTableITCase extends TableStoreTestBase {
+/** IT cases for managed table dml. */
+public class ReadWriteTableITCase extends KafkaTableTestBase {
- private final boolean hasPk;
- @Nullable private final Boolean duplicate;
+ private String rootPath;
- public ReadWriteTableITCase(
- RuntimeExecutionMode executionMode,
- String tableName,
- boolean enableLogStore,
- boolean hasPk,
- @Nullable Boolean duplicate,
- ExpectedResult expectedResult) {
- super(executionMode, tableName, enableLogStore, expectedResult);
- this.hasPk = hasPk;
- this.duplicate = duplicate;
+ @Test
+ public void testBatchWriteWithPartitionedRecordsWithPk() throws Exception {
+ String managedTable = prepareEnvAndWrite(false, false, true, true);
+
+ // input is dailyRates()
+ List<Row> expectedRecords =
+ Arrays.asList(
+ // part = 2022-01-01
+ changelogRow("+I", "US Dollar", 102L, "2022-01-01"),
+ changelogRow("+I", "Yen", 1L, "2022-01-01"),
+ changelogRow("+I", "Euro", 114L, "2022-01-01"),
+ // part = 2022-01-02
+ changelogRow("+I", "Euro", 119L, "2022-01-02"));
+ // test batch read
+ collectAndCheck(tEnv, managedTable, Collections.emptyMap(), expectedRecords).close();
+ checkFileStorePath(tEnv, managedTable);
+
+ // test streaming read
+ final StreamTableEnvironment streamTableEnv =
+ StreamTableEnvironment.create(buildStreamEnv());
+ registerTable(streamTableEnv, managedTable);
+ BlockingIterator<Row, Row> streamIter =
+ collectAndCheck(
+ streamTableEnv, managedTable, Collections.emptyMap(), expectedRecords);
+
+ // overwrite static partition 2022-01-02
+ prepareEnvAndOverwrite(
+ managedTable,
+ Collections.singletonMap("dt", "'2022-01-02'"),
+ Arrays.asList(new String[] {"'Euro'", "100"}, new String[] {"'Yen'", "1"}));
+
+ // streaming iter will not receive any changelog
+ assertNoMoreRecords(streamIter);
+
+ // batch read to check partition refresh
+ expectedRecords = new ArrayList<>(expectedRecords);
+ expectedRecords.remove(3);
+ expectedRecords.add(changelogRow("+I", "Euro", 100L, "2022-01-02"));
+ expectedRecords.add(changelogRow("+I", "Yen", 1L, "2022-01-02"));
+ collectAndCheck(tEnv, managedTable, Collections.emptyMap(), expectedRecords).close();
}
- @Override
- public void after() {
- tEnv.executeSql("DROP TABLE `source_table`");
- super.after();
+ @Test
+ public void testBatchWriteWithPartitionedRecordsWithoutPk() throws Exception {
+ String managedTable = prepareEnvAndWrite(false, false, true, false);
+
+ // input is dailyRates()
+ List<Row> expectedRecords = dailyRates();
+
+ // test batch read
+ collectAndCheck(tEnv, managedTable, Collections.emptyMap(), expectedRecords).close();
+ checkFileStorePath(tEnv, managedTable);
+
+ // test batch read with latest-scan
+ collectAndCheck(
+ tEnv,
+ managedTable,
+ Collections.singletonMap(
+ LogOptions.SCAN.key(),
+ LogOptions.LogStartupMode.LATEST.name().toLowerCase()),
+ Collections.emptyList())
+ .close();
+
+ // overwrite dynamic partition
+ prepareEnvAndOverwrite(
+ managedTable,
+ Collections.emptyMap(),
+ Arrays.asList(
+ new String[] {"'Euro'", "90", "'2022-01-01'"},
+ new String[] {"'Yen'", "2", "'2022-01-02'"}));
+
+ // test streaming read
+ final StreamTableEnvironment streamTableEnv =
+ StreamTableEnvironment.create(buildStreamEnv());
+ registerTable(streamTableEnv, managedTable);
+ collectAndCheck(
+ streamTableEnv,
+ managedTable,
+ Collections.emptyMap(),
+ Arrays.asList(
+ // part = 2022-01-01
+ changelogRow("+I", "Euro", 90L, "2022-01-01"),
+ // part = 2022-01-02
+ changelogRow("+I", "Yen", 2L, "2022-01-02")))
+ .close();
}
@Test
- public void testReadWriteNonPartitioned() throws Exception {
- String statement =
- String.format("INSERT INTO %s \nSELECT * FROM `source_table`", tableIdentifier);
- if (expectedResult.success) {
- tEnv.executeSql(statement).await();
- TableResult result =
- tEnv.executeSql(String.format("SELECT * FROM %s", tableIdentifier));
- List<Row> actual = new ArrayList<>();
- try (CloseableIterator<Row> iterator = result.collect()) {
- while (iterator.hasNext()) {
- actual.add(iterator.next());
- }
- }
- assertThat(actual).containsExactlyInAnyOrderElementsOf(expectedResult.expectedRecords);
- String relativeFilePath = FileStoreOptions.relativeTablePath(tableIdentifier);
- // check snapshot file path
- assertThat(Paths.get(rootPath, relativeFilePath, "snapshot")).exists();
- // check manifest file path
- assertThat(Paths.get(rootPath, relativeFilePath, "manifest")).exists();
-
- if (enableLogStore) {
- assertThat(topicExists(tableIdentifier.asSummaryString())).isTrue();
- }
+ public void testBatchWriteWithNonPartitionedRecordsWithPk() throws Exception {
+ String managedTable = prepareEnvAndWrite(false, false, false, true);
+
+ // input is rates()
+ List<Row> expectedRecords =
+ Arrays.asList(
+ changelogRow("+I", "US Dollar", 102L),
+ changelogRow("+I", "Yen", 1L),
+ changelogRow("+I", "Euro", 119L));
+ collectAndCheck(tEnv, managedTable, Collections.emptyMap(), expectedRecords).close();
+ checkFileStorePath(tEnv, managedTable);
+
+ // overwrite the whole table
+ prepareEnvAndOverwrite(
+ managedTable,
+ Collections.emptyMap(),
+ Collections.singletonList(new String[] {"'Euro'", "100"}));
+ expectedRecords = new ArrayList<>(expectedRecords);
+ expectedRecords.clear();
+ expectedRecords.add(changelogRow("+I", "Euro", 100L));
+ collectAndCheck(tEnv, managedTable, Collections.emptyMap(), expectedRecords).close();
+ }
+
+ @Test
+ public void testBatchWriteNonPartitionedRecordsWithoutPk() throws Exception {
+ String managedTable = prepareEnvAndWrite(false, false, false, false);
+
+ // input is rates()
+ List<Row> expectedRecords = rates();
+ collectAndCheck(tEnv, managedTable, Collections.emptyMap(), expectedRecords).close();
+ checkFileStorePath(tEnv, managedTable);
+ }
+
+ @Ignore("changelog case is failed")
+ @Test
+ public void testEnableLogAndStreamingReadWritePartitionedRecordsWithPk() throws Exception {
+ String managedTable = prepareEnvAndWrite(true, true, true, true);
+
+ // input is dailyRatesChangelogWithoutUB()
+ // test hybrid read
+ List<Row> expectedRecords =
+ Arrays.asList(
+ // part = 2022-01-01
+ changelogRow("+I", "US Dollar", 102L, "2022-01-01"),
+ // part = 2022-01-02
+ changelogRow("+I", "Euro", 119L, "2022-01-02"));
+ BlockingIterator<Row, Row> streamIter =
+ collectAndCheck(tEnv, managedTable, Collections.emptyMap(), expectedRecords);
+ checkFileStorePath(tEnv, managedTable);
+
+ // overwrite partition 2022-01-02
+ prepareEnvAndOverwrite(
+ managedTable,
+ Collections.singletonMap("dt", "'2022-01-02'"),
+ Arrays.asList(new String[] {"'Euro'", "100"}, new String[] {"'Yen'", "1"}));
+
+ // batch read to check data refresh
+ final StreamTableEnvironment batchTableEnv =
+ StreamTableEnvironment.create(buildBatchEnv(), EnvironmentSettings.inBatchMode());
+ registerTable(batchTableEnv, managedTable);
+ collectAndCheck(
+ batchTableEnv,
+ managedTable,
+ Collections.emptyMap(),
+ Arrays.asList(
+ // part = 2022-01-01
+ changelogRow("+I", "US Dollar", 102L, "2022-01-01"),
+ // part = 2022-01-02
+ changelogRow("+I", "Euro", 100L, "2022-01-02"),
+ changelogRow("+I", "Yen", 1L, "2022-01-02")));
+
+ // check no changelog generated for streaming read
+ assertNoMoreRecords(streamIter);
+ }
+
+ @Ignore("file store continuous read is failed, actual has size 1")
+ @Test
+ public void testDisableLogAndStreamingReadWritePartitionedRecordsWithPk() throws Exception {
+ String managedTable = prepareEnvAndWrite(true, false, true, true);
+
+ // disable log store and read from latest
+ collectAndCheck(
+ tEnv,
+ managedTable,
+ Collections.singletonMap(
+ LogOptions.SCAN.key(),
+ LogOptions.LogStartupMode.LATEST.name().toLowerCase()),
+ Collections.emptyList())
+ .close();
+
+ // input is dailyRatesChangelogWithoutUB()
+ // file store continuous read
+ // will not merge, at least collect two records
+ List<Row> expectedRecords =
+ Arrays.asList(
+ // part = 2022-01-01
+ changelogRow("+I", "US Dollar", 102L, "2022-01-01"),
+ // part = 2022-01-02
+ changelogRow("+I", "Euro", 119L, "2022-01-02"));
+ collectAndCheck(tEnv, managedTable, Collections.emptyMap(), expectedRecords).close();
+ }
+
+ @Test
+ public void testStreamingReadWritePartitionedRecordsWithoutPk() throws Exception {
+ String managedTable = prepareEnvAndWrite(true, true, true, false);
+
+ // input is dailyRatesChangelogWithUB()
+ // enable log store, file store bounded read with merge
+ List<Row> expectedRecords =
+ Arrays.asList(
+ // part = 2022-01-01
+ changelogRow("+I", "US Dollar", 102L, "2022-01-01"),
+ // part = 2022-01-02
+ changelogRow("+I", "Euro", 115L, "2022-01-02"));
+ collectAndCheck(tEnv, managedTable, Collections.emptyMap(), expectedRecords).close();
+ }
+
+ @Test
+ public void testStreamingReadWriteNonPartitionedRecordsWithPk() throws Exception {
+ String managedTable = prepareEnvAndWrite(true, true, false, true);
+
+ // input is ratesChangelogWithoutUB()
+ // enable log store, file store bounded read with merge
+ List<Row> expectedRecords =
+ Arrays.asList(
+ changelogRow("+I", "US Dollar", 102L), changelogRow("+I", "Euro", 119L));
+ collectAndCheck(tEnv, managedTable, Collections.emptyMap(), expectedRecords).close();
+ }
+
+ @Test
+ public void testStreamingReadWriteNonPartitionedRecordsWithoutPk() throws Exception {
+ String managedTable = prepareEnvAndWrite(true, true, false, false);
+
+ // input is ratesChangelogWithUB()
+ // enable log store, with default full scan mode, will merge
+ List<Row> expectedRecords =
+ Arrays.asList(
+ changelogRow("+I", "US Dollar", 102L), changelogRow("+I", "Euro", 119L));
+
+ collectAndCheck(tEnv, managedTable, Collections.emptyMap(), expectedRecords).close();
+ }
+
+ // ------------------------ Tools ----------------------------------
+
+ private String prepareEnvAndWrite(
+ boolean streaming, boolean enableLogStore, boolean partitioned, boolean hasPk)
+ throws Exception {
+ Map<String, String> tableOptions = new HashMap<>();
+ rootPath = TEMPORARY_FOLDER.newFolder().getPath();
+ tableOptions.put(FileStoreOptions.FILE_PATH.key(), rootPath);
+ if (enableLogStore) {
+ tableOptions.put(LOG_SYSTEM.key(), "kafka");
+ tableOptions.put(LOG_PREFIX + BOOTSTRAP_SERVERS.key(), getBootstrapServers());
+ }
+ String sourceTable = "source_table_" + UUID.randomUUID();
+ String managedTable = "managed_table_" + UUID.randomUUID();
+ EnvironmentSettings.Builder builder = EnvironmentSettings.newInstance().inStreamingMode();
+ String helperTableDdl;
+ if (streaming) {
+ helperTableDdl =
+ prepareHelperSourceWithChangelogRecords(sourceTable, partitioned, hasPk);
+ env = buildStreamEnv();
+ builder.inStreamingMode();
} else {
- assertThatThrownBy(
- () -> {
- tEnv.executeSql(statement).await();
- tEnv.executeSql(String.format("SELECT * FROM %s", tableIdentifier))
- .collect();
- })
- .isInstanceOf(expectedResult.expectedType)
- .hasMessageContaining(expectedResult.expectedMessage);
+ helperTableDdl =
+ prepareHelperSourceWithInsertOnlyRecords(sourceTable, partitioned, hasPk);
+ env = buildBatchEnv();
+ builder.inBatchMode();
}
+ String managedTableDdl = prepareManagedTableDdl(sourceTable, managedTable, tableOptions);
+ String insertQuery = prepareInsertIntoQuery(sourceTable, managedTable);
+
+ tEnv = StreamTableEnvironment.create(env, builder.build());
+ tEnv.executeSql(helperTableDdl);
+ tEnv.executeSql(managedTableDdl);
+ tEnv.executeSql(insertQuery).await();
+ return managedTable;
}
- @Parameterized.Parameters(
- name =
- "executionMode-{0}, tableName-{1}, "
- + "enableLogStore-{2}, hasPk-{3},"
- + " duplicate-{4}, expectedResult-{5}")
- public static List<Object[]> data() {
- List<Object[]> specs = new ArrayList<>();
- // batch cases
- specs.add(
- new Object[] {
- RuntimeExecutionMode.BATCH,
- "table_" + UUID.randomUUID(),
- false, // disable log store
- false, // no pk
- false, // without duplicate
- new ExpectedResult().success(true).expectedRecords(insertOnlyCities(false))
- });
- specs.add(
- new Object[] {
- RuntimeExecutionMode.BATCH,
- "table_" + UUID.randomUUID(),
- false, // disable log store
- false, // no pk
- true, // with duplicate
- new ExpectedResult().success(true).expectedRecords(insertOnlyCities(true))
- });
- List<Row> expected = new ArrayList<>(rates());
- expected.remove(1);
- specs.add(
- new Object[] {
- RuntimeExecutionMode.BATCH,
- "table_" + UUID.randomUUID(),
- false, // disable log store
- true, // has pk
- null, // without duplicate
- new ExpectedResult().success(true).expectedRecords(expected)
- });
- // TODO: streaming with log system
+ private void prepareEnvAndOverwrite(
+ String managedTable,
+ Map<String, String> staticPartitions,
+ List<String[]> overwriteRecords)
+ throws Exception {
+ final StreamTableEnvironment batchEnv =
+ StreamTableEnvironment.create(buildBatchEnv(), EnvironmentSettings.inBatchMode());
+ registerTable(batchEnv, managedTable);
+ String insertQuery =
+ prepareInsertOverwriteQuery(managedTable, staticPartitions, overwriteRecords);
+ batchEnv.executeSql(insertQuery).await();
+ }
- // TODO: add overwrite case
+ private void registerTable(StreamTableEnvironment tEnvToRegister, String managedTable)
+ throws Exception {
+ String cat = this.tEnv.getCurrentCatalog();
+ String db = this.tEnv.getCurrentDatabase();
+ ObjectPath objectPath = new ObjectPath(db, managedTable);
+ CatalogBaseTable table = this.tEnv.getCatalog(cat).get().getTable(objectPath);
+ tEnvToRegister.getCatalog(cat).get().createTable(objectPath, table, false);
+ }
- return specs;
+ private BlockingIterator<Row, Row> collect(
+ StreamTableEnvironment tEnv, String selectQuery, int expectedSize, List<Row> actual)
+ throws Exception {
+ TableResult result = tEnv.executeSql(selectQuery);
+ BlockingIterator<Row, Row> iterator = BlockingIterator.of(result.collect());
+ actual.addAll(iterator.collect(expectedSize));
+ return iterator;
}
- @Override
- protected void prepareEnv() {
- if (hasPk) {
- if (executionMode == RuntimeExecutionMode.STREAMING) {
- registerUpsertRecordsWithPk();
- } else {
- registerInsertOnlyRecordsWithPk();
- }
- } else {
- if (duplicate != null) {
- registerInsertOnlyRecordsWithoutPk(duplicate);
- } else {
- registerInsertUpdateDeleteRecordsWithoutPk();
- }
+ private BlockingIterator<Row, Row> collectAndCheck(
+ StreamTableEnvironment tEnv,
+ String managedTable,
+ Map<String, String> hints,
+ List<Row> expectedRecords)
+ throws Exception {
+ String selectQuery = prepareSimpleSelectQuery(managedTable, hints);
+ List<Row> actual = new ArrayList<>();
+ BlockingIterator<Row, Row> iterator =
+ collect(tEnv, selectQuery, expectedRecords.size(), actual);
+
+ assertThat(actual).containsExactlyInAnyOrderElementsOf(expectedRecords);
+ return iterator;
+ }
+
+ private void checkFileStorePath(StreamTableEnvironment tEnv, String managedTable) {
+ String relativeFilePath =
+ FileStoreOptions.relativeTablePath(
+ ObjectIdentifier.of(
+ tEnv.getCurrentCatalog(), tEnv.getCurrentDatabase(), managedTable));
+ // check snapshot file path
+ assertThat(Paths.get(rootPath, relativeFilePath, "snapshot")).exists();
+ // check manifest file path
+ assertThat(Paths.get(rootPath, relativeFilePath, "manifest")).exists();
+ }
+
+ private static void assertNoMoreRecords(BlockingIterator<Row, Row> iterator) {
+ List<Row> expectedRecords = Collections.emptyList();
+ try {
+ // set expectation size to 1 to let time pass by until timeout
+ expectedRecords = iterator.collect(1, 1L, TimeUnit.MINUTES);
+ iterator.close();
+ } catch (Exception ignored) {
+ // don't throw exception
}
+ assertThat(expectedRecords).isEmpty();
}
- private void registerInsertUpdateDeleteRecordsWithoutPk() {
- tEnv.executeSql(
- String.format(
- "CREATE TABLE source_table (\n"
- + " user_id STRING,\n"
- + " user_name STRING,\n"
- + " email STRING,\n"
- + " balance DECIMAL(18,2)\n"
- + ") WITH (\n"
- + " 'connector' = 'values',\n"
- + " 'bounded' = '%s',\n"
- + " 'data-id' = '%s',\n"
- + " 'changelog-mode' = 'I,UA,UB,D',\n"
- + " 'disable-lookup' = 'true'\n"
- + ")",
- executionMode == RuntimeExecutionMode.BATCH,
- registerData(TestData.userChangelog())));
- registerTableStoreSink();
+ private static String prepareManagedTableDdl(
+ String sourceTableName, String managedTableName, Map<String, String> tableOptions) {
+ StringBuilder ddl =
+ new StringBuilder("CREATE TABLE IF NOT EXISTS ")
+ .append(String.format("`%s`", managedTableName));
+ if (tableOptions.size() > 0) {
+ ddl.append(" WITH (\n");
+ tableOptions.forEach(
+ (k, v) ->
+ ddl.append(" ")
+ .append(String.format("'%s'", k))
+ .append(" = ")
+ .append(String.format("'%s',\n", v)));
+ int len = ddl.length();
+ ddl.delete(len - 2, len);
+ ddl.append(")");
+ }
+ ddl.append(String.format(" LIKE `%s` (EXCLUDING OPTIONS)\n", sourceTableName));
+ return ddl.toString();
}
- private void registerInsertOnlyRecordsWithoutPk(boolean duplicate) {
- tEnv.executeSql(
- String.format(
- "CREATE TABLE source_table (\n"
- + " name STRING NOT NULL,\n"
- + " state STRING NOT NULL,\n"
- + " pop INT NOT NULL\n"
- + ") WITH (\n"
- + " 'connector' = 'values',\n"
- + " 'bounded' = '%s',\n"
- + " 'data-id' = '%s',\n"
- + " 'changelog-mode' = 'I'\n"
- + ")",
- executionMode == RuntimeExecutionMode.BATCH,
- registerData(insertOnlyCities(duplicate))));
- registerTableStoreSink();
+ private static String prepareInsertIntoQuery(String sourceTableName, String managedTableName) {
+ return prepareInsertIntoQuery(
+ sourceTableName, managedTableName, Collections.emptyMap(), Collections.emptyMap());
}
- private void registerInsertOnlyRecordsWithPk() {
- tEnv.executeSql(
- String.format(
- "CREATE TABLE source_table (\n"
- + " currency STRING,\n"
- + " rate BIGINT,\n"
- + " PRIMARY KEY (currency) NOT ENFORCED\n"
- + ") WITH (\n"
- + " 'connector' = 'values',\n"
- + " 'bounded' = '%s',\n"
- + " 'data-id' = '%s',\n"
- + " 'changelog-mode' = 'I',\n"
- + " 'disable-lookup' = 'true'\n"
- + ")",
- executionMode == RuntimeExecutionMode.BATCH, registerData(rates())));
- registerTableStoreSink();
+ private static String prepareInsertIntoQuery(
+ String sourceTableName,
+ String managedTableName,
+ Map<String, String> partitions,
+ Map<String, String> hints) {
+ StringBuilder insertDmlBuilder =
+ new StringBuilder(String.format("INSERT INTO `%s`", managedTableName));
+ if (partitions.size() > 0) {
+ insertDmlBuilder.append(" PARTITION (");
+ partitions.forEach(
+ (k, v) -> {
+ insertDmlBuilder.append(String.format("'%s'", k));
+ insertDmlBuilder.append(" = ");
+ insertDmlBuilder.append(String.format("'%s', ", v));
+ });
+ int len = insertDmlBuilder.length();
+ insertDmlBuilder.deleteCharAt(len - 1);
+ insertDmlBuilder.append(")");
+ }
+ insertDmlBuilder.append(String.format("\n SELECT * FROM `%s`", sourceTableName));
+ insertDmlBuilder.append(buildHints(hints));
+
+ return insertDmlBuilder.toString();
+ }
+
+ private static String prepareInsertOverwriteQuery(
+ String managedTableName,
+ Map<String, String> staticPartitions,
+ List<String[]> overwriteRecords) {
+ StringBuilder insertDmlBuilder =
+ new StringBuilder(String.format("INSERT OVERWRITE `%s`", managedTableName));
+ if (staticPartitions.size() > 0) {
+ insertDmlBuilder.append(" PARTITION (");
+ staticPartitions.forEach(
+ (k, v) -> {
+ insertDmlBuilder.append(String.format("%s", k));
+ insertDmlBuilder.append(" = ");
+ insertDmlBuilder.append(String.format("%s, ", v));
+ });
+ int len = insertDmlBuilder.length();
+ insertDmlBuilder.delete(len - 2, len);
+ insertDmlBuilder.append(")");
+ }
+ insertDmlBuilder.append("\n VALUES ");
+ overwriteRecords.forEach(
+ record -> {
+ int arity = record.length;
+ insertDmlBuilder.append("(");
+ IntStream.range(0, arity)
+ .forEach(i -> insertDmlBuilder.append(record[i]).append(", "));
+
+ if (arity > 0) {
+ int len = insertDmlBuilder.length();
+ insertDmlBuilder.delete(len - 2, len);
+ }
+ insertDmlBuilder.append("), ");
+ });
+ int len = insertDmlBuilder.length();
+ insertDmlBuilder.delete(len - 2, len);
+ return insertDmlBuilder.toString();
+ }
+
+ private static String prepareSimpleSelectQuery(String tableName, Map<String, String> hints) {
+ return String.format("SELECT * FROM `%s` %s", tableName, buildHints(hints));
+ }
+
+ private static String buildHints(Map<String, String> hints) {
+ if (hints.size() > 0) {
+ StringBuilder hintsBuilder = new StringBuilder("/* + OPTIONS (");
+ hints.forEach(
+ (k, v) -> {
+ hintsBuilder.append(String.format("'%s'", k));
+ hintsBuilder.append(" = ");
+ hintsBuilder.append(String.format("'%s', ", v));
+ });
+ int len = hintsBuilder.length();
+ hintsBuilder.deleteCharAt(len - 1);
+ hintsBuilder.append(") */");
+ return hintsBuilder.toString();
+ }
+ return "";
+ }
+
+ private static String prepareHelperSourceWithInsertOnlyRecords(
+ String sourceTable, boolean partitioned, boolean hasPk) {
+ return prepareHelperSourceRecords(
+ RuntimeExecutionMode.BATCH, sourceTable, partitioned, hasPk);
+ }
+
+ private static String prepareHelperSourceWithChangelogRecords(
+ String sourceTable, boolean partitioned, boolean hasPk) {
+ return prepareHelperSourceRecords(
+ RuntimeExecutionMode.STREAMING, sourceTable, partitioned, hasPk);
}
- private void registerUpsertRecordsWithPk() {
- tEnv.executeSql(
+ /**
+ * Prepare helper source table ddl according to different input parameter.
+ *
+ * <pre> E.g. pk with partition
+ * {@code
+ * CREATE TABLE source_table (
+ * currency STRING,
+ * rate BIGINT,
+ * dt STRING) PARTITIONED BY (dt)
+ * WITH (
+ * 'connector' = 'values',
+ * 'bounded' = executionMode == RuntimeExecutionMode.BATCH,
+ * 'partition-list' = '...'
+ * )
+ * }
+ * </pre>
+ *
+ * @param executionMode is used to calculate {@code bounded}
+ * @param sourceTable source table name
+ * @param partitioned is used to calculate {@code partition-list}
+ * @param hasPk
+ * @return helper source ddl
+ */
+ private static String prepareHelperSourceRecords(
+ RuntimeExecutionMode executionMode,
+ String sourceTable,
+ boolean partitioned,
+ boolean hasPk) {
+ boolean bounded = executionMode == RuntimeExecutionMode.BATCH;
+ String changelogMode = bounded ? "I" : hasPk ? "I,UA,D" : "I,UA,UB,D";
+ StringBuilder ddlBuilder =
+ new StringBuilder(String.format("CREATE TABLE `%s` (\n", sourceTable))
+ .append(" currency STRING,\n")
+ .append(" rate BIGINT");
+ if (partitioned) {
+ ddlBuilder.append(",\n dt STRING");
+ }
+ if (hasPk) {
+ ddlBuilder.append(", \n PRIMARY KEY (currency");
+ if (partitioned) {
+ ddlBuilder.append(", dt");
+ }
+ ddlBuilder.append(") NOT ENFORCED\n");
+ } else {
+ ddlBuilder.append("\n");
+ }
+ ddlBuilder.append(")");
+ if (partitioned) {
+ ddlBuilder.append(" PARTITIONED BY (dt)\n");
+ }
+ List<Row> input;
+ if (bounded) {
+ input = partitioned ? dailyRates() : rates();
+ } else {
+ if (hasPk) {
+ input = partitioned ? dailyRatesChangelogWithoutUB() : ratesChangelogWithoutUB();
+ } else {
+ input = partitioned ? dailyRatesChangelogWithUB() : ratesChangelogWithUB();
+ }
+ }
+ ddlBuilder.append(
String.format(
- "CREATE TABLE source_table (\n"
- + " currency STRING,\n"
- + " rate BIGINT,\n"
- + " PRIMARY KEY (currency) NOT ENFORCED\n"
- + ") WITH (\n"
+ " WITH (\n"
+ " 'connector' = 'values',\n"
- + " 'bounded' = '%s',\n"
+ + " 'bounded' = '%s',\n"
+ " 'data-id' = '%s',\n"
- + " 'changelog-mode' = 'UA,D',\n"
- + " 'disable-lookup' = 'true'\n"
+ + " 'changelog-mode' = '%s',\n"
+ + " 'disable-lookup' = 'true',\n"
+ + " 'partition-list' = '%s'\n"
+ ")",
- executionMode == RuntimeExecutionMode.BATCH,
- registerData(ratesChangelog())));
- registerTableStoreSink();
+ bounded,
+ registerData(input),
+ changelogMode,
+ partitioned ? "dt:2022-01-01;dt:2022-01-02" : ""));
+ return ddlBuilder.toString();
}
private static List<Row> rates() {
@@ -249,10 +597,22 @@ public class ReadWriteTableITCase extends TableStoreTestBase {
changelogRow("+I", "US Dollar", 102L),
changelogRow("+I", "Euro", 114L),
changelogRow("+I", "Yen", 1L),
+ changelogRow("+I", "Euro", 114L),
changelogRow("+I", "Euro", 119L));
}
- private static List<Row> ratesChangelog() {
+ private static List<Row> dailyRates() {
+ return Arrays.asList(
+ // part = 2022-01-01
+ changelogRow("+I", "US Dollar", 102L, "2022-01-01"),
+ changelogRow("+I", "Euro", 114L, "2022-01-01"),
+ changelogRow("+I", "Yen", 1L, "2022-01-01"),
+ changelogRow("+I", "Euro", 114L, "2022-01-01"),
+ // part = 2022-01-02
+ changelogRow("+I", "Euro", 119L, "2022-01-02"));
+ }
+
+ static List<Row> ratesChangelogWithoutUB() {
return Arrays.asList(
changelogRow("+I", "US Dollar", 102L),
changelogRow("+I", "Euro", 114L),
@@ -264,24 +624,62 @@ public class ReadWriteTableITCase extends TableStoreTestBase {
changelogRow("-D", "Yen", 1L));
}
- private static List<Row> insertOnlyCities(boolean duplicate) {
- List<Row> cities = JavaConverters.seqAsJavaList(TestData.citiesData());
- return duplicate
- ? Stream.concat(cities.stream(), cities.stream()).collect(Collectors.toList())
- : cities;
+ static List<Row> dailyRatesChangelogWithoutUB() {
+ return Arrays.asList(
+ // part = 2022-01-01
+ changelogRow("+I", "US Dollar", 102L, "2022-01-01"),
+ changelogRow("+I", "Euro", 114L, "2022-01-01"),
+ changelogRow("+I", "Yen", 1L, "2022-01-01"),
+ changelogRow("+U", "Euro", 116L, "2022-01-01"),
+ changelogRow("-D", "Yen", 1L, "2022-01-01"),
+ changelogRow("-D", "Euro", 116L, "2022-01-01"),
+ // part = 2022-01-02
+ changelogRow("+I", "Euro", 119L, "2022-01-02"),
+ changelogRow("+U", "Euro", 119L, "2022-01-02"));
}
- private static List<Row> userChangelog() {
+ private static List<Row> ratesChangelogWithUB() {
return Arrays.asList(
- changelogRow("+I", "user1", "Tom", "tom123@gmail.com", new BigDecimal("8.10")),
- changelogRow("+I", "user3", "Bailey", "bailey@qq.com", new BigDecimal("9.99")),
- changelogRow("+I", "user4", "Tina", "tina@gmail.com", new BigDecimal("11.30")));
+ changelogRow("+I", "US Dollar", 102L),
+ changelogRow("+I", "Euro", 114L),
+ changelogRow("+I", "Yen", 1L),
+ changelogRow("-U", "Euro", 114L),
+ changelogRow("+U", "Euro", 116L),
+ changelogRow("-D", "Euro", 116L),
+ changelogRow("+I", "Euro", 119L),
+ changelogRow("-U", "Euro", 119L),
+ changelogRow("+U", "Euro", 119L),
+ changelogRow("-D", "Yen", 1L));
}
- private void registerTableStoreSink() {
- tEnv.executeSql(
- String.format(
- "CREATE TABLE %s LIKE `source_table` (EXCLUDING OPTIONS)",
- tableIdentifier.asSerializableString()));
+ private static List<Row> dailyRatesChangelogWithUB() {
+ return Arrays.asList(
+ // part = 2022-01-01
+ changelogRow("+I", "US Dollar", 102L, "2022-01-01"),
+ changelogRow("+I", "Euro", 116L, "2022-01-01"),
+ changelogRow("-D", "Euro", 116L, "2022-01-01"),
+ changelogRow("+I", "Yen", 1L, "2022-01-01"),
+ changelogRow("-D", "Yen", 1L, "2022-01-01"),
+ // part = 2022-01-02
+ changelogRow("+I", "Euro", 114L, "2022-01-02"),
+ changelogRow("-U", "Euro", 114L, "2022-01-02"),
+ changelogRow("+U", "Euro", 119L, "2022-01-02"),
+ changelogRow("-D", "Euro", 119L, "2022-01-02"),
+ changelogRow("+I", "Euro", 115L, "2022-01-02"));
+ }
+
+ private static StreamExecutionEnvironment buildStreamEnv() {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+ env.enableCheckpointing(100);
+ env.setParallelism(2);
+ return env;
+ }
+
+ private static StreamExecutionEnvironment buildBatchEnv() {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+ env.setParallelism(2);
+ return env;
}
}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
index 0fe91af..191a197 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreTestBase.java
@@ -154,6 +154,7 @@ public abstract class TableStoreTestBase extends KafkaTableTestBase {
protected static class ExpectedResult {
protected boolean success;
protected List<Row> expectedRecords;
+ protected boolean failureHasCause;
protected Class<? extends Throwable> expectedType;
protected String expectedMessage;
@@ -167,6 +168,11 @@ public abstract class TableStoreTestBase extends KafkaTableTestBase {
return this;
}
+ ExpectedResult failureHasCause(boolean failureHasCause) {
+ this.failureHasCause = failureHasCause;
+ return this;
+ }
+
ExpectedResult expectedType(Class<? extends Throwable> exceptionClazz) {
this.expectedType = exceptionClazz;
return this;
@@ -184,6 +190,8 @@ public abstract class TableStoreTestBase extends KafkaTableTestBase {
+ success
+ ", expectedRecords="
+ expectedRecords
+ + ", failureHasCause="
+ + failureHasCause
+ ", expectedType="
+ expectedType
+ ", expectedMessage='"
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
index e2ef3e7..de48269 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
@@ -122,6 +122,7 @@ public class StoreSinkTest {
fileStore,
partitions,
primaryKeys,
+ primaryKeys,
2,
() -> lock,
new HashMap<>(),
@@ -252,6 +253,7 @@ public class StoreSinkTest {
fileStore,
partitions,
primaryKeys,
+ primaryKeys,
2,
() -> lock,
overwritePartition,
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecordConverter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecordConverter.java
index 2280418..112f772 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecordConverter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecordConverter.java
@@ -25,6 +25,9 @@ import org.apache.flink.table.store.codegen.CodeGenUtils;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
import java.util.stream.IntStream;
/** Converter for converting {@link RowData} to {@link SinkRecord}. */
@@ -38,14 +41,24 @@ public class SinkRecordConverter {
private final Projection<RowData, BinaryRowData> pkProjection;
+ @Nullable private final Projection<RowData, BinaryRowData> logPkProjection;
+
public SinkRecordConverter(
- int numBucket, RowType inputType, int[] partitions, int[] primaryKeys) {
+ int numBucket,
+ RowType inputType,
+ int[] partitions,
+ int[] primaryKeys,
+ int[] logPrimaryKeys) {
this.numBucket = numBucket;
this.allProjection =
CodeGenUtils.newProjection(
inputType, IntStream.range(0, inputType.getFieldCount()).toArray());
this.partProjection = CodeGenUtils.newProjection(inputType, partitions);
this.pkProjection = CodeGenUtils.newProjection(inputType, primaryKeys);
+ this.logPkProjection =
+ Arrays.equals(primaryKeys, logPrimaryKeys)
+ ? null
+ : CodeGenUtils.newProjection(inputType, logPrimaryKeys);
}
public SinkRecord convert(RowData row) {
@@ -55,10 +68,23 @@ public class SinkRecordConverter {
return new SinkRecord(partition, bucket, primaryKey, row);
}
+ public SinkRecord convertToLogSinkRecord(SinkRecord record) {
+ if (logPkProjection == null) {
+ return record;
+ }
+ BinaryRowData logPrimaryKey = logPrimaryKey(record.row());
+ return new SinkRecord(record.partition(), record.bucket(), logPrimaryKey, record.row());
+ }
+
public BinaryRowData primaryKey(RowData row) {
return pkProjection.apply(row);
}
+ private BinaryRowData logPrimaryKey(RowData row) {
+ assert logPkProjection != null;
+ return logPkProjection.apply(row);
+ }
+
public int bucket(RowData row, BinaryRowData primaryKey) {
int hash = primaryKey.getArity() == 0 ? hashRow(row) : primaryKey.hashCode();
return Math.abs(hash % numBucket);
diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSinkProvider.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSinkProvider.java
index 85b9cc0..e084bb9 100644
--- a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSinkProvider.java
+++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSinkProvider.java
@@ -46,7 +46,7 @@ public class KafkaLogSinkProvider implements LogSinkProvider {
private final Properties properties;
- @Nullable private final SerializationSchema<RowData> keySerializer;
+ @Nullable private final SerializationSchema<RowData> primaryKeySerializer;
private final SerializationSchema<RowData> valueSerializer;
@@ -57,13 +57,13 @@ public class KafkaLogSinkProvider implements LogSinkProvider {
public KafkaLogSinkProvider(
String topic,
Properties properties,
- @Nullable SerializationSchema<RowData> keySerializer,
+ @Nullable SerializationSchema<RowData> primaryKeySerializer,
SerializationSchema<RowData> valueSerializer,
LogConsistency consistency,
LogChangelogMode changelogMode) {
this.topic = topic;
this.properties = properties;
- this.keySerializer = keySerializer;
+ this.primaryKeySerializer = primaryKeySerializer;
this.valueSerializer = valueSerializer;
this.consistency = consistency;
this.changelogMode = changelogMode;
@@ -78,7 +78,7 @@ public class KafkaLogSinkProvider implements LogSinkProvider {
.setTransactionalIdPrefix("log-store-" + topic);
break;
case EVENTUAL:
- if (keySerializer == null) {
+ if (primaryKeySerializer == null) {
throw new IllegalArgumentException(
"Can not use EVENTUAL consistency mode for non-pk table.");
}
@@ -101,6 +101,6 @@ public class KafkaLogSinkProvider implements LogSinkProvider {
@VisibleForTesting
KafkaLogSerializationSchema createSerializationSchema() {
return new KafkaLogSerializationSchema(
- topic, keySerializer, valueSerializer, changelogMode);
+ topic, primaryKeySerializer, valueSerializer, changelogMode);
}
}
diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
index 58386fb..a5e787b 100644
--- a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
+++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
@@ -197,11 +197,11 @@ public class KafkaLogStoreFactory implements LogStoreTableFactory {
FactoryUtil.TableFactoryHelper helper = createTableFactoryHelper(this, context);
ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
DataType physicalType = schema.toPhysicalRowDataType();
- SerializationSchema<RowData> keySerializer = null;
+ SerializationSchema<RowData> primaryKeySerializer = null;
int[] primaryKey = schema.getPrimaryKeyIndexes();
if (primaryKey.length > 0) {
DataType keyType = DataTypeUtils.projectRow(physicalType, primaryKey);
- keySerializer =
+ primaryKeySerializer =
LogStoreTableFactory.getKeyEncodingFormat(helper)
.createRuntimeEncoder(sinkContext, keyType);
}
@@ -211,7 +211,7 @@ public class KafkaLogStoreFactory implements LogStoreTableFactory {
return new KafkaLogSinkProvider(
topic(context),
toKafkaProperties(helper.getOptions()),
- keySerializer,
+ primaryKeySerializer,
valueSerializer,
helper.getOptions().get(CONSISTENCY),
helper.getOptions().get(CHANGELOG_MODE));