You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/07/17 09:56:06 UTC
[incubator-paimon] branch master updated: [flink][kafka-cdc] Kafka cdc should dynamically create table (#1564)
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3750a4214 [flink][kafka-cdc] Kafka cdc should dynamically create table (#1564)
3750a4214 is described below
commit 3750a42143c2a5eb61c75738272dac17a74d319f
Author: yuzelin <33...@users.noreply.github.com>
AuthorDate: Mon Jul 17 17:56:01 2023 +0800
[flink][kafka-cdc] Kafka cdc should dynamically create table (#1564)
---
docs/content/how-to/cdc-ingestion.md | 10 +-
.../shortcodes/generated/mysql_sync_database.html | 4 +
...DatabaseSyncMode.java => DatabaseSyncMode.java} | 13 +-
.../paimon/flink/action/cdc/kafka/KafkaSchema.java | 115 ++-----------
.../action/cdc/kafka/KafkaSyncDatabaseAction.java | 179 +++------------------
.../cdc/kafka/KafkaSyncDatabaseActionFactory.java | 20 ---
.../action/cdc/kafka/KafkaSyncTableAction.java | 10 +-
.../action/cdc/kafka/canal/CanalRecordParser.java | 79 +++++++--
.../cdc/mysql/MySqlDebeziumJsonEventParser.java | 100 +++---------
.../paimon/flink/action/cdc/mysql/MySqlSchema.java | 11 --
.../action/cdc/mysql/MySqlSyncDatabaseAction.java | 27 ++--
.../cdc/mysql/MySqlSyncDatabaseActionFactory.java | 33 +++-
.../action/cdc/mysql/MySqlTableSchemaBuilder.java | 139 ++++++++++++++++
.../cdc/CdcDynamicTableParsingProcessFunction.java | 2 +-
.../apache/paimon/flink/sink/cdc/EventParser.java | 3 +-
.../sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java | 18 +--
.../cdc/NewTableSchemaBuilder.java} | 21 ++-
.../flink/sink/cdc/RichCdcMultiplexRecord.java | 47 ++++--
.../cdc/RichCdcMultiplexRecordEventParser.java | 104 +++++++++++-
.../cdc/RichCdcMultiplexRecordSchemaBuilder.java} | 40 ++---
.../action/cdc/kafka/KafkaActionITCaseBase.java | 14 ++
.../kafka/KafkaCanalSyncDatabaseActionITCase.java | 60 +++----
.../cdc/kafka/KafkaCanalSyncTableActionITCase.java | 44 +++--
.../cdc/mysql/MySqlSyncDatabaseActionITCase.java | 10 +-
.../paimon/flink/sink/cdc/TestCdcEventParser.java | 7 -
25 files changed, 545 insertions(+), 565 deletions(-)
diff --git a/docs/content/how-to/cdc-ingestion.md b/docs/content/how-to/cdc-ingestion.md
index 3533d9906..9c49ea285 100644
--- a/docs/content/how-to/cdc-ingestion.md
+++ b/docs/content/how-to/cdc-ingestion.md
@@ -113,6 +113,7 @@ To use this feature through `flink run`, run the following shell command.
[--table-suffix <paimon-table-suffix>] \
[--including-tables <mysql-table-name|name-regular-expr>] \
[--excluding-tables <mysql-table-name|name-regular-expr>] \
+ [--mode <sync-mode>] \
[--mysql-conf <mysql-cdc-source-conf> [--mysql-conf <mysql-cdc-source-conf> ...]] \
[--catalog-conf <paimon-catalog-conf> [--catalog-conf <paimon-catalog-conf> ...]] \
[--table-conf <paimon-table-sink-conf> [--table-conf <paimon-table-sink-conf> ...]]
@@ -289,8 +290,6 @@ To use this feature through `flink run`, run the following shell command.
kafka-sync-database
--warehouse <warehouse-path> \
--database <database-name> \
- [--schema-init-max-read <int>] \
- [--ignore-incompatible <true/false>] \
[--table-prefix <paimon-table-prefix>] \
[--table-suffix <paimon-table-suffix>] \
[--including-tables <table-name|name-regular-expr>] \
@@ -304,8 +303,10 @@ To use this feature through `flink run`, run the following shell command.
Only tables with primary keys will be synchronized.
-For each Kafka topic's table to be synchronized, if the corresponding Paimon table does not exist, this action will automatically create the table.
-Its schema will be derived from all specified Kafka topic's tables,it gets the earliest non-DDL data parsing schema from topic. If the Paimon table already exists, its schema will be compared against the schema of all specified Kafka topic's tables.
+This action will build a single combined sink for all tables. For each Kafka topic's table to be synchronized, if the
+corresponding Paimon table does not exist, this action will automatically create the table, and its schema will be derived
+from all specified Kafka topic's tables. If the Paimon table already exists and its schema is different from that parsed
+from Kafka record, this action will try to preform schema evolution.
Example
@@ -317,7 +318,6 @@ Synchronization from one Kafka topic to Paimon database.
kafka-sync-database \
--warehouse hdfs:///path/to/warehouse \
--database test_db \
- --schema-init-max-read 500 \
--kafka-conf properties.bootstrap.servers=127.0.0.1:9020 \
--kafka-conf topic=order \
--kafka-conf properties.group.id=123456 \
diff --git a/docs/layouts/shortcodes/generated/mysql_sync_database.html b/docs/layouts/shortcodes/generated/mysql_sync_database.html
index 1c1dc789c..cf6ededc0 100644
--- a/docs/layouts/shortcodes/generated/mysql_sync_database.html
+++ b/docs/layouts/shortcodes/generated/mysql_sync_database.html
@@ -53,6 +53,10 @@ under the License.
<td><h5>--excluding-tables</h5></td>
<td>It is used to specify which source tables are not to be synchronized. The usage is same as "--including-tables". "--excluding-tables" has higher priority than "--including-tables" if you specified both.</td>
</tr>
+ <tr>
+ <td><h5>--mode</h5></td>
+ <td>It is used to specify synchronization mode.<br />Possible values:<ul><li>"divided" (the default mode if you haven't specified one): start a sink for each table, the synchronization of the new table requires restarting the job.</li><li>"combined": start a single combined sink for all tables, the new table will be automatically synchronized.</li></ul></td>
+ </tr>
<tr>
<td><h5>--mysql-conf</h5></td>
<td>The configuration for Flink CDC MySQL table sources. Each configuration should be specified in the format "key=value". hostname, username, password, database-name and table-name are required configurations, others are optional. See its <a href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connector-options">document</a> for a complete list of configurations.</td>
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDatabaseSyncMode.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/DatabaseSyncMode.java
similarity index 72%
copy from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDatabaseSyncMode.java
copy to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/DatabaseSyncMode.java
index a7eccba66..8e4641ccd 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDatabaseSyncMode.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/DatabaseSyncMode.java
@@ -16,19 +16,20 @@
* limitations under the License.
*/
-package org.apache.paimon.flink.action.cdc.mysql;
+package org.apache.paimon.flink.action.cdc;
import java.io.Serializable;
/**
* There are two modes for database sync.
*
- * <p>1) SEPARATE mode, start a sink for each table, the synchronization of the new table requires
+ * <p>1) DIVIDED mode, start a sink for each table, the synchronization of the new table requires
* restarting the job.
*
- * <p>2) UNIFIED mode, start a unified sink, the new table will be automatically synchronized.
+ * <p>2) COMBINED mode, start a single combined sink for all tables, the new table will be
+ * automatically synchronized.
*/
-public enum MySqlDatabaseSyncMode implements Serializable {
- SEPARATE,
- UNIFIED
+public enum DatabaseSyncMode implements Serializable {
+ DIVIDED,
+ COMBINED
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
index 3fc4015ba..deda133de 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
@@ -18,15 +18,9 @@
package org.apache.paimon.flink.action.cdc.kafka;
-import org.apache.paimon.flink.action.cdc.mysql.MySqlTypeUtils;
+import org.apache.paimon.flink.action.cdc.kafka.canal.CanalRecordParser;
import org.apache.paimon.types.DataType;
-import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
-import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
-import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
-
-import org.apache.commons.compress.utils.Lists;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -36,10 +30,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
-import java.util.ArrayList;
import java.util.Collections;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -50,7 +41,6 @@ import java.util.UUID;
public class KafkaSchema {
private static final int MAX_RETRY = 100;
- private static final int MAX_READ = 1000;
private final String databaseName;
private final String tableName;
@@ -96,45 +86,7 @@ public class KafkaSchema {
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
- return consumer;
- }
-
- private static Boolean extractIsDDL(JsonNode record) {
- return Boolean.valueOf(extractJsonNode(record, "isDdl"));
- }
-
- private static String extractJsonNode(JsonNode record, String key) {
- return record != null && record.get(key) != null ? record.get(key).asText() : null;
- }
-
- private static KafkaSchema parseCanalJson(String record, ObjectMapper objectMapper)
- throws JsonProcessingException {
- String databaseName;
- String tableName;
- final Map<String, DataType> fields = new LinkedHashMap<>();
- final List<String> primaryKeys = new ArrayList<>();
- JsonNode root = objectMapper.readValue(record, JsonNode.class);
- if (!extractIsDDL(root)) {
- JsonNode mysqlType = root.get("mysqlType");
- Iterator<String> iterator = mysqlType.fieldNames();
- while (iterator.hasNext()) {
- String fieldName = iterator.next();
- String fieldType = mysqlType.get(fieldName).asText();
- String type = MySqlTypeUtils.getShortType(fieldType);
- int precision = MySqlTypeUtils.getPrecision(fieldType);
- int scale = MySqlTypeUtils.getScale(fieldType);
- fields.put(fieldName, MySqlTypeUtils.toDataType(type, precision, scale));
- }
- ArrayNode pkNames = (ArrayNode) root.get("pkNames");
- for (int i = 0; i < pkNames.size(); i++) {
- primaryKeys.add(pkNames.get(i).asText());
- }
- databaseName = extractJsonNode(root, "database");
- tableName = extractJsonNode(root, "table");
- return new KafkaSchema(databaseName, tableName, fields, primaryKeys);
- }
- return null;
+ return new KafkaConsumer<>(props);
}
public static KafkaSchema getKafkaSchema(Configuration kafkaConfig, String topic)
@@ -142,25 +94,20 @@ public class KafkaSchema {
KafkaConsumer<String, String> consumer = getKafkaEarliestConsumer(kafkaConfig);
consumer.subscribe(Collections.singletonList(topic));
- KafkaSchema kafkaSchema;
int retry = 0;
- ObjectMapper objectMapper = new ObjectMapper();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
- try {
- String format = kafkaConfig.get(KafkaConnectorOptions.VALUE_FORMAT);
- if ("canal-json".equals(format)) {
- kafkaSchema = parseCanalJson(record.value(), objectMapper);
- if (kafkaSchema != null) {
- return kafkaSchema;
- }
- } else {
- throw new UnsupportedOperationException(
- "This format: " + format + " is not support.");
+ String format = kafkaConfig.get(KafkaConnectorOptions.VALUE_FORMAT);
+ if ("canal-json".equals(format)) {
+ CanalRecordParser parser = new CanalRecordParser(true, Collections.emptyList());
+ KafkaSchema kafkaSchema = parser.getKafkaSchema(record.value());
+ if (kafkaSchema != null) {
+ return kafkaSchema;
}
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
+ } else {
+ throw new UnsupportedOperationException(
+ "This format: " + format + " is not support.");
}
}
if (retry == MAX_RETRY) {
@@ -171,46 +118,6 @@ public class KafkaSchema {
}
}
- public static List<KafkaSchema> getListKafkaSchema(
- Configuration kafkaConfig, String topic, int maxRead) throws Exception {
- KafkaConsumer<String, String> consumer = getKafkaEarliestConsumer(kafkaConfig);
-
- consumer.subscribe(Collections.singletonList(topic));
- List<KafkaSchema> kafkaSchemaList = Lists.newArrayList();
- int retry = 0;
- int read = 0;
- maxRead = maxRead == 0 ? MAX_READ : maxRead;
- ObjectMapper objectMapper = new ObjectMapper();
- while (maxRead > read) {
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
- for (ConsumerRecord<String, String> record : records) {
- read++;
- try {
- String format = kafkaConfig.get(KafkaConnectorOptions.VALUE_FORMAT);
- if ("canal-json".equals(format)) {
- KafkaSchema kafkaSchema = parseCanalJson(record.value(), objectMapper);
- if (kafkaSchema != null && !kafkaSchemaList.contains(kafkaSchema)) {
- kafkaSchemaList.add(kafkaSchema);
- }
- } else {
- throw new UnsupportedOperationException(
- "This format: " + format + " is not support.");
- }
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- }
- if (kafkaSchemaList.isEmpty() && retry == MAX_RETRY) {
- throw new Exception("Could not get metadata from server,topic :" + topic);
- } else if (!kafkaSchemaList.isEmpty() && retry == MAX_RETRY) {
- break;
- }
- Thread.sleep(100);
- retry++;
- }
- return kafkaSchemaList;
- }
-
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
index 07b20c523..b4894a31d 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
@@ -18,21 +18,17 @@
package org.apache.paimon.flink.action.cdc.kafka;
-import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.ActionBase;
+import org.apache.paimon.flink.action.cdc.DatabaseSyncMode;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
import org.apache.paimon.flink.action.cdc.kafka.canal.CanalRecordParser;
import org.apache.paimon.flink.sink.cdc.EventParser;
import org.apache.paimon.flink.sink.cdc.FlinkCdcSyncDatabaseSinkBuilder;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecordEventParser;
-import org.apache.paimon.schema.Schema;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecordSchemaBuilder;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
@@ -40,19 +36,11 @@ import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.apache.flink.util.CollectionUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-import java.util.function.Supplier;
import java.util.regex.Pattern;
-import java.util.stream.Collectors;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -63,10 +51,10 @@ import static org.apache.paimon.utils.Preconditions.checkArgument;
* href="https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/connectors/table/kafka/">document
* of flink-connectors</a> for detailed keys and values.
*
- * <p>For each topic's table to be synchronized, if the corresponding Paimon table does not exist,
- * this action will automatically create the table. Its schema will be derived from all specified
- * tables. If the Paimon table already exists, its schema will be compared against the schema of all
- * specified tables.
+ * <p>For each Kafka topic's table to be synchronized, if the corresponding Paimon table does not
+ * exist, this action will automatically create the table, and its schema will be derived from all
+ * specified Kafka topic's tables. If the Paimon table already exists and its schema is different
+ * from that parsed from Kafka record, this action will try to preform schema evolution.
*
* <p>This action supports a limited number of schema changes. Currently, the framework can not drop
* columns, so the behaviors of `DROP` will be ignored, `RENAME` will add a new column. Currently
@@ -88,18 +76,13 @@ import static org.apache.paimon.utils.Preconditions.checkArgument;
* are supported.
* </ul>
*
- * <p>This action creates a Paimon table sink for each Paimon table to be written, so this action is
- * not very efficient in resource saving. We may optimize this action by merging all sinks into one
- * instance in the future.
+ * <p>To automatically synchronize new table, This action creates a single sink for all Paimon
+ * tables to be written. See {@link DatabaseSyncMode#COMBINED}.
*/
public class KafkaSyncDatabaseAction extends ActionBase {
- private static final Logger LOG = LoggerFactory.getLogger(KafkaSyncDatabaseAction.class);
-
private final Configuration kafkaConfig;
private final String database;
- private final int schemaInitMaxRead;
- private final boolean ignoreIncompatible;
private final String tablePrefix;
private final String tableSuffix;
@Nullable private final Pattern includingPattern;
@@ -110,29 +93,15 @@ public class KafkaSyncDatabaseAction extends ActionBase {
Map<String, String> kafkaConfig,
String warehouse,
String database,
- boolean ignoreIncompatible,
Map<String, String> catalogConfig,
Map<String, String> tableConfig) {
- this(
- kafkaConfig,
- warehouse,
- database,
- 0,
- ignoreIncompatible,
- null,
- null,
- null,
- null,
- catalogConfig,
- tableConfig);
+ this(kafkaConfig, warehouse, database, null, null, null, null, catalogConfig, tableConfig);
}
KafkaSyncDatabaseAction(
Map<String, String> kafkaConfig,
String warehouse,
String database,
- int schemaInitMaxRead,
- boolean ignoreIncompatible,
@Nullable String tablePrefix,
@Nullable String tableSuffix,
@Nullable String includingTables,
@@ -142,8 +111,6 @@ public class KafkaSyncDatabaseAction extends ActionBase {
super(warehouse, catalogConfig);
this.kafkaConfig = Configuration.fromMap(kafkaConfig);
this.database = database;
- this.schemaInitMaxRead = schemaInitMaxRead;
- this.ignoreIncompatible = ignoreIncompatible;
this.tablePrefix = tablePrefix == null ? "" : tablePrefix;
this.tableSuffix = tableSuffix == null ? "" : tableSuffix;
this.includingPattern = includingTables == null ? null : Pattern.compile(includingTables);
@@ -165,59 +132,23 @@ public class KafkaSyncDatabaseAction extends ActionBase {
validateCaseInsensitive();
}
- Map<String, List<KafkaSchema>> kafkaCanalSchemaMap = getKafkaCanalSchemaMap();
-
catalog.createDatabase(database, true);
TableNameConverter tableNameConverter =
new TableNameConverter(caseSensitive, tablePrefix, tableSuffix);
- List<FileStoreTable> fileStoreTables = new ArrayList<>();
- List<String> monitoredTopics = new ArrayList<>();
- for (Map.Entry<String, List<KafkaSchema>> kafkaCanalSchemaEntry :
- kafkaCanalSchemaMap.entrySet()) {
- List<KafkaSchema> kafkaSchemaList = kafkaCanalSchemaEntry.getValue();
- String topic = kafkaCanalSchemaEntry.getKey();
- for (KafkaSchema kafkaSchema : kafkaSchemaList) {
- String paimonTableName = tableNameConverter.convert(kafkaSchema.tableName());
- Identifier identifier = new Identifier(database, paimonTableName);
- FileStoreTable table;
- Schema fromCanal =
- KafkaActionUtils.buildPaimonSchema(
- kafkaSchema,
- Collections.emptyList(),
- Collections.emptyList(),
- Collections.emptyList(),
- tableConfig,
- caseSensitive);
- try {
- table = (FileStoreTable) catalog.getTable(identifier);
- Supplier<String> errMsg =
- incompatibleMessage(table.schema(), kafkaSchema, identifier);
- if (shouldMonitorTable(table.schema(), fromCanal, errMsg)) {
- monitoredTopics.add(topic);
- fileStoreTables.add(table);
- }
- } catch (Catalog.TableNotExistException e) {
- catalog.createTable(identifier, fromCanal, false);
- table = (FileStoreTable) catalog.getTable(identifier);
- monitoredTopics.add(topic);
- fileStoreTables.add(table);
- }
- }
- }
- monitoredTopics = monitoredTopics.stream().distinct().collect(Collectors.toList());
- Preconditions.checkState(
- !fileStoreTables.isEmpty(),
- "No tables to be synchronized. Possible cause is the schemas of all tables in specified "
- + "Kafka topic's table are not compatible with those of existed Paimon tables. Please check the log.");
-
- kafkaConfig.set(KafkaConnectorOptions.TOPIC, monitoredTopics);
KafkaSource<String> source = KafkaActionUtils.buildKafkaSource(kafkaConfig);
EventParser.Factory<RichCdcMultiplexRecord> parserFactory;
String format = kafkaConfig.get(KafkaConnectorOptions.VALUE_FORMAT);
if ("canal-json".equals(format)) {
- parserFactory = RichCdcMultiplexRecordEventParser::new;
+ RichCdcMultiplexRecordSchemaBuilder schemaBuilder =
+ new RichCdcMultiplexRecordSchemaBuilder(tableConfig);
+ Pattern includingPattern = this.includingPattern;
+ Pattern excludingPattern = this.excludingPattern;
+ parserFactory =
+ () ->
+ new RichCdcMultiplexRecordEventParser(
+ schemaBuilder, includingPattern, excludingPattern);
} else {
throw new UnsupportedOperationException("This format: " + format + " is not support.");
}
@@ -233,9 +164,9 @@ public class KafkaSyncDatabaseAction extends ActionBase {
new CanalRecordParser(
caseSensitive, tableNameConverter)))
.withParserFactory(parserFactory)
- .withTables(fileStoreTables)
.withCatalogLoader(catalogLoader())
- .withDatabase(database);
+ .withDatabase(database)
+ .withMode(DatabaseSyncMode.COMBINED);
String sinkParallelism = tableConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key());
if (sinkParallelism != null) {
sinkBuilder.withParallelism(Integer.parseInt(sinkParallelism));
@@ -261,78 +192,6 @@ public class KafkaSyncDatabaseAction extends ActionBase {
tableSuffix));
}
- private Map<String, List<KafkaSchema>> getKafkaCanalSchemaMap() throws Exception {
- Map<String, List<KafkaSchema>> kafkaCanalSchemaMap = new HashMap<>();
- List<String> topicList = kafkaConfig.get(KafkaConnectorOptions.TOPIC);
- if (topicList.size() > 1) {
- topicList.forEach(
- topic -> {
- try {
- KafkaSchema kafkaSchema =
- KafkaSchema.getKafkaSchema(kafkaConfig, topic);
- if (shouldMonitorTable(kafkaSchema.tableName())) {
- kafkaCanalSchemaMap.put(
- topic, Collections.singletonList(kafkaSchema));
- }
-
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- });
- } else {
- List<KafkaSchema> kafkaSchemaList =
- KafkaSchema.getListKafkaSchema(
- kafkaConfig, topicList.get(0), schemaInitMaxRead);
- kafkaSchemaList =
- kafkaSchemaList.stream()
- .filter(kafkaSchema -> shouldMonitorTable(kafkaSchema.tableName()))
- .collect(Collectors.toList());
- kafkaCanalSchemaMap.put(topicList.get(0), kafkaSchemaList);
- }
-
- return kafkaCanalSchemaMap;
- }
-
- private boolean shouldMonitorTable(String mySqlTableName) {
- boolean shouldMonitor = true;
- if (includingPattern != null) {
- shouldMonitor = includingPattern.matcher(mySqlTableName).matches();
- }
- if (excludingPattern != null) {
- shouldMonitor = shouldMonitor && !excludingPattern.matcher(mySqlTableName).matches();
- }
- LOG.debug("Source table {} is monitored? {}", mySqlTableName, shouldMonitor);
- return shouldMonitor;
- }
-
- private boolean shouldMonitorTable(
- TableSchema tableSchema, Schema schema, Supplier<String> errMsg) {
- if (KafkaActionUtils.schemaCompatible(tableSchema, schema)) {
- return true;
- } else if (ignoreIncompatible) {
- LOG.warn(errMsg.get() + "This table will be ignored.");
- return false;
- } else {
- throw new IllegalArgumentException(
- errMsg.get()
- + "If you want to ignore the incompatible tables, please specify --ignore-incompatible to true.");
- }
- }
-
- private Supplier<String> incompatibleMessage(
- TableSchema paimonSchema, KafkaSchema kafkaSchema, Identifier identifier) {
- return () ->
- String.format(
- "Incompatible schema found.\n"
- + "Paimon table is: %s, fields are: %s.\n"
- + "Kafka's table is: %s.%s, fields are: %s.\n",
- identifier.getFullName(),
- paimonSchema.fields(),
- kafkaSchema.databaseName(),
- kafkaSchema.tableName(),
- kafkaSchema.fields());
- }
-
// ------------------------------------------------------------------------
// Flink run methods
// ------------------------------------------------------------------------
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionFactory.java
index 3d2d6c96d..3af9599ac 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionFactory.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseActionFactory.java
@@ -42,14 +42,8 @@ public class KafkaSyncDatabaseActionFactory implements ActionFactory {
checkRequiredArgument(params, "database");
checkRequiredArgument(params, "kafka-conf");
- int schemaInitMaxRead = 1000;
- if (params.has("schema-init-max-read")) {
- schemaInitMaxRead = Integer.parseInt(params.get("schema-init-max-read"));
- }
-
String warehouse = params.get("warehouse");
String database = params.get("database");
- boolean ignoreIncompatible = Boolean.parseBoolean(params.get("ignore-incompatible"));
String tablePrefix = params.get("table-prefix");
String tableSuffix = params.get("table-suffix");
String includingTables = params.get("including-tables");
@@ -63,8 +57,6 @@ public class KafkaSyncDatabaseActionFactory implements ActionFactory {
kafkaConfigOption,
warehouse,
database,
- schemaInitMaxRead,
- ignoreIncompatible,
tablePrefix,
tableSuffix,
includingTables,
@@ -85,8 +77,6 @@ public class KafkaSyncDatabaseActionFactory implements ActionFactory {
System.out.println("Syntax:");
System.out.println(
" kafka-sync-database --warehouse <warehouse-path> --database <database-name> "
- + "[--schema-init-max-read <schema-init-max-read>] "
- + "[--ignore-incompatible <true/false>] "
+ "[--table-prefix <paimon-table-prefix>] "
+ "[--table-suffix <paimon-table-suffix>] "
+ "[--including-tables <table-name|name-regular-expr>] "
@@ -96,16 +86,6 @@ public class KafkaSyncDatabaseActionFactory implements ActionFactory {
+ "[--table-conf <paimon-table-sink-conf> [--table-conf <paimon-table-sink-conf> ...]]");
System.out.println();
- System.out.println(
- "--schema-init-max-read is default 1000, if your tables are all from a topic, you can set this parameter to initialize the number of tables to be synchronized.");
- System.out.println();
-
- System.out.println(
- "--ignore-incompatible is default false, in this case, if Topic's table name exists in Paimon "
- + "and their schema is incompatible, an exception will be thrown. "
- + "You can specify it to true explicitly to ignore the incompatible tables and exception.");
- System.out.println();
-
System.out.println(
"--table-prefix is the prefix of all Paimon tables to be synchronized. For example, if you want all "
+ "synchronized tables to have \"ods_\" as prefix, you can specify `--table-prefix ods_`.");
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
index d890eeaa7..7ebd1e03f 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
@@ -156,15 +156,7 @@ public class KafkaSyncTableAction extends ActionBase {
table = (FileStoreTable) catalog.getTable(identifier);
KafkaActionUtils.assertSchemaCompatible(table.schema(), fromCanal);
} catch (Catalog.TableNotExistException e) {
- Schema schema =
- KafkaActionUtils.buildPaimonSchema(
- kafkaSchema,
- partitionKeys,
- primaryKeys,
- computedColumns,
- paimonConfig,
- caseSensitive);
- catalog.createTable(identifier, schema, false);
+ catalog.createTable(identifier, fromCanal, false);
table = (FileStoreTable) catalog.getTable(identifier);
}
String format = kafkaConfig.get(KafkaConnectorOptions.VALUE_FORMAT);
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/canal/CanalRecordParser.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/canal/CanalRecordParser.java
index 6812aaa8a..2b9c68747 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/canal/CanalRecordParser.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/canal/CanalRecordParser.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.action.cdc.kafka.canal;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
+import org.apache.paimon.flink.action.cdc.kafka.KafkaSchema;
import org.apache.paimon.flink.action.cdc.mysql.MySqlTypeUtils;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
@@ -27,6 +28,7 @@ import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.StringUtils;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -46,6 +48,8 @@ import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlAlterTableModifyCo
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
+import javax.annotation.Nullable;
+
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
@@ -65,6 +69,7 @@ public class CanalRecordParser implements FlatMapFunction<String, RichCdcMultipl
private static final String FIELD_TABLE = "table";
private static final String FIELD_SQL = "sql";
private static final String FIELD_MYSQL_TYPE = "mysqlType";
+ private static final String FIELD_PRIMARY_KEYS = "pkNames";
private static final String FIELD_TYPE = "type";
private static final String FIELD_DATA = "data";
private static final String FIELD_OLD = "old";
@@ -104,12 +109,37 @@ public class CanalRecordParser implements FlatMapFunction<String, RichCdcMultipl
root = objectMapper.readValue(value, JsonNode.class);
validateFormat();
- databaseName = root.get(FIELD_DATABASE).asText();
- tableName = tableNameConverter.convert(root.get(FIELD_TABLE).asText());
+ databaseName = extractString(FIELD_DATABASE);
+ tableName = tableNameConverter.convert(extractString(FIELD_TABLE));
extractRecords().forEach(out::collect);
}
+ @Nullable
+ public KafkaSchema getKafkaSchema(String record) {
+ try {
+ root = objectMapper.readValue(record, JsonNode.class);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ validateFormat();
+
+ if (isDdl()) {
+ return null;
+ }
+
+ LinkedHashMap<String, String> mySqlFieldTypes = extractFieldTypesFromMySqlType();
+ LinkedHashMap<String, DataType> paimonFieldTypes = new LinkedHashMap<>();
+ mySqlFieldTypes.forEach(
+ (name, type) -> paimonFieldTypes.put(name, toPaimonDataType(type, true)));
+
+ return new KafkaSchema(
+ extractString(FIELD_DATABASE),
+ extractString(FIELD_TABLE),
+ paimonFieldTypes,
+ extractPrimaryKeys());
+ }
+
private void validateFormat() {
String errorMessageTemplate =
"Didn't find '%s' node in json. Only supports canal-json format,"
@@ -124,9 +154,14 @@ public class CanalRecordParser implements FlatMapFunction<String, RichCdcMultipl
checkNotNull(root.get(FIELD_SQL), errorMessageTemplate, FIELD_SQL);
} else {
checkNotNull(root.get(FIELD_MYSQL_TYPE), errorMessageTemplate, FIELD_MYSQL_TYPE);
+ checkNotNull(root.get(FIELD_PRIMARY_KEYS), errorMessageTemplate, FIELD_PRIMARY_KEYS);
}
}
+ private String extractString(String key) {
+ return root.get(key).asText();
+ }
+
private boolean isDdl() {
return root.get("isDdl") != null && root.get("isDdl").asBoolean();
}
@@ -136,6 +171,8 @@ public class CanalRecordParser implements FlatMapFunction<String, RichCdcMultipl
return extractRecordsFromDdl();
}
+ List<String> primaryKeys = extractPrimaryKeys();
+
// extract field types
LinkedHashMap<String, String> mySqlFieldTypes = extractFieldTypesFromMySqlType();
LinkedHashMap<String, DataType> paimonFieldTypes = new LinkedHashMap<>();
@@ -144,7 +181,7 @@ public class CanalRecordParser implements FlatMapFunction<String, RichCdcMultipl
// extract row kind and field values
List<RichCdcMultiplexRecord> records = new ArrayList<>();
- String type = root.get(FIELD_TYPE).asText();
+ String type = extractString(FIELD_TYPE);
ArrayNode data = (ArrayNode) root.get(FIELD_DATA);
switch (type) {
case OP_UPDATE:
@@ -167,18 +204,20 @@ public class CanalRecordParser implements FlatMapFunction<String, RichCdcMultipl
before = caseSensitive ? before : keyCaseInsensitive(before);
records.add(
new RichCdcMultiplexRecord(
- new CdcRecord(RowKind.DELETE, before),
- paimonFieldTypes,
databaseName,
- tableName));
+ tableName,
+ paimonFieldTypes,
+ primaryKeys,
+ new CdcRecord(RowKind.DELETE, before)));
}
after = caseSensitive ? after : keyCaseInsensitive(after);
records.add(
new RichCdcMultiplexRecord(
- new CdcRecord(RowKind.INSERT, after),
- paimonFieldTypes,
databaseName,
- tableName));
+ tableName,
+ paimonFieldTypes,
+ primaryKeys,
+ new CdcRecord(RowKind.INSERT, after)));
}
break;
case OP_INSERT:
@@ -190,10 +229,11 @@ public class CanalRecordParser implements FlatMapFunction<String, RichCdcMultipl
RowKind kind = type.equals(OP_INSERT) ? RowKind.INSERT : RowKind.DELETE;
records.add(
new RichCdcMultiplexRecord(
- new CdcRecord(kind, after),
- paimonFieldTypes,
databaseName,
- tableName));
+ tableName,
+ paimonFieldTypes,
+ primaryKeys,
+ new CdcRecord(kind, after)));
}
break;
default:
@@ -204,7 +244,7 @@ public class CanalRecordParser implements FlatMapFunction<String, RichCdcMultipl
}
private List<RichCdcMultiplexRecord> extractRecordsFromDdl() {
- String sql = root.get(FIELD_SQL).asText();
+ String sql = extractString(FIELD_SQL);
if (StringUtils.isEmpty(sql)) {
return Collections.emptyList();
}
@@ -221,7 +261,11 @@ public class CanalRecordParser implements FlatMapFunction<String, RichCdcMultipl
return Collections.singletonList(
new RichCdcMultiplexRecord(
- CdcRecord.emptyRecord(), fieldTypes, databaseName, tableName));
+ databaseName,
+ tableName,
+ fieldTypes,
+ Collections.emptyList(),
+ CdcRecord.emptyRecord()));
}
private void extractFieldTypesFromAlterTableItem(
@@ -276,6 +320,13 @@ public class CanalRecordParser implements FlatMapFunction<String, RichCdcMultipl
return MySqlTypeUtils.toDataType(mySqlType).copy(isNullable);
}
+ private List<String> extractPrimaryKeys() {
+ List<String> primaryKeys = new ArrayList<>();
+ ArrayNode pkNames = (ArrayNode) root.get(FIELD_PRIMARY_KEYS);
+ pkNames.iterator().forEachRemaining(pk -> primaryKeys.add(toFieldName(pk.asText())));
+ return primaryKeys;
+ }
+
private LinkedHashMap<String, String> extractFieldTypesFromMySqlType() {
LinkedHashMap<String, String> fieldTypes = new LinkedHashMap<>();
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
index f8ec80618..71de9e336 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
@@ -27,6 +27,7 @@ import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.EventParser;
+import org.apache.paimon.flink.sink.cdc.NewTableSchemaBuilder;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
@@ -39,18 +40,6 @@ import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeRefe
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import com.alibaba.druid.sql.SQLUtils;
-import com.alibaba.druid.sql.ast.SQLDataType;
-import com.alibaba.druid.sql.ast.SQLExpr;
-import com.alibaba.druid.sql.ast.SQLName;
-import com.alibaba.druid.sql.ast.SQLStatement;
-import com.alibaba.druid.sql.ast.expr.SQLCharExpr;
-import com.alibaba.druid.sql.ast.expr.SQLIntegerExpr;
-import com.alibaba.druid.sql.ast.statement.SQLColumnDefinition;
-import com.alibaba.druid.sql.ast.statement.SQLTableElement;
-import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlCreateTableStatement;
-import com.alibaba.druid.util.JdbcConstants;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,7 +53,6 @@ import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -81,29 +69,45 @@ public class MySqlDebeziumJsonEventParser implements EventParser<String> {
private final boolean caseSensitive;
private final TableNameConverter tableNameConverter;
private final List<ComputedColumn> computedColumns;
+ private final NewTableSchemaBuilder<String> schemaBuilder;
private JsonNode root;
private JsonNode payload;
public MySqlDebeziumJsonEventParser(
ZoneId serverTimeZone, boolean caseSensitive, List<ComputedColumn> computedColumns) {
- this(serverTimeZone, caseSensitive, computedColumns, new TableNameConverter(caseSensitive));
+ this(
+ serverTimeZone,
+ caseSensitive,
+ computedColumns,
+ new TableNameConverter(caseSensitive),
+ ddl -> Optional.empty());
}
public MySqlDebeziumJsonEventParser(
- ZoneId serverTimeZone, boolean caseSensitive, TableNameConverter tableNameConverter) {
- this(serverTimeZone, caseSensitive, Collections.emptyList(), tableNameConverter);
+ ZoneId serverTimeZone,
+ boolean caseSensitive,
+ TableNameConverter tableNameConverter,
+ NewTableSchemaBuilder<String> schemaBuilder) {
+ this(
+ serverTimeZone,
+ caseSensitive,
+ Collections.emptyList(),
+ tableNameConverter,
+ schemaBuilder);
}
public MySqlDebeziumJsonEventParser(
ZoneId serverTimeZone,
boolean caseSensitive,
List<ComputedColumn> computedColumns,
- TableNameConverter tableNameConverter) {
+ TableNameConverter tableNameConverter,
+ NewTableSchemaBuilder<String> schemaBuilder) {
this.serverTimeZone = serverTimeZone;
this.caseSensitive = caseSensitive;
this.computedColumns = computedColumns;
this.tableNameConverter = tableNameConverter;
+ this.schemaBuilder = schemaBuilder;
}
@Override
@@ -178,7 +182,7 @@ public class MySqlDebeziumJsonEventParser implements EventParser<String> {
}
@Override
- public Optional<Schema> parseNewTable(String databaseName) {
+ public Optional<Schema> parseNewTable() {
JsonNode historyRecord = payload.get("historyRecord");
if (historyRecord == null) {
return Optional.empty();
@@ -190,69 +194,13 @@ public class MySqlDebeziumJsonEventParser implements EventParser<String> {
if (Strings.isNullOrEmpty(ddl)) {
return Optional.empty();
}
-
- SQLStatement statement = SQLUtils.parseSingleStatement(ddl, JdbcConstants.MYSQL);
-
- if (!(statement instanceof MySqlCreateTableStatement)) {
- return Optional.empty();
- }
-
- MySqlCreateTableStatement createTableStatement = (MySqlCreateTableStatement) statement;
-
- // TODO: add default table config, partitions, and computed column
- // for newly added table;
- MySqlSchema mySqlSchema = buildMySqlSchema(databaseName, createTableStatement);
- Schema fromMySql =
- MySqlActionUtils.buildPaimonSchema(
- mySqlSchema,
- Collections.emptyList(),
- mySqlSchema.getPrimaryKeys(),
- computedColumns,
- Collections.emptyMap(),
- caseSensitive);
-
- return Optional.of(fromMySql);
-
+ return schemaBuilder.build(ddl);
} catch (Exception e) {
LOG.info("Failed to parse history record for schema changes", e);
return Optional.empty();
}
}
- private MySqlSchema buildMySqlSchema(String database, MySqlCreateTableStatement statement) {
- LinkedHashMap<String, Tuple2<DataType, String>> fields = new LinkedHashMap<>();
-
- List<SQLTableElement> columns = statement.getTableElementList();
- for (SQLTableElement element : columns) {
- if (element instanceof SQLColumnDefinition) {
- SQLColumnDefinition column = (SQLColumnDefinition) element;
- SQLName name = column.getName();
- SQLDataType dataType = column.getDataType();
- List<SQLExpr> arguments = dataType.getArguments();
- Integer precision = null;
- Integer scale = null;
- if (arguments.size() >= 1) {
- precision = (int) (((SQLIntegerExpr) arguments.get(0)).getValue());
- }
-
- if (arguments.size() >= 2) {
- scale = (int) (((SQLIntegerExpr) arguments.get(1)).getValue());
- }
-
- SQLCharExpr comment = (SQLCharExpr) column.getComment();
- fields.put(
- name.getSimpleName(),
- Tuple2.of(
- MySqlTypeUtils.toDataType(
- column.getDataType().getName(), precision, scale),
- comment == null ? null : String.valueOf(comment.getValue())));
- }
- }
-
- return new MySqlSchema(
- database, statement.getTableName(), fields, statement.getPrimaryKeyNames());
- }
-
@Override
public List<CdcRecord> parseRecords() {
if (isSchemaChange()) {
@@ -378,7 +326,7 @@ public class MySqlDebeziumJsonEventParser implements EventParser<String> {
} else if ("io.debezium.time.ZonedTimestamp".equals(className)) {
// MySQL timestamp
- // dispaly value of timestamp is affected by timezone, see
+ // display value of timestamp is affected by timezone, see
// https://dev.mysql.com/doc/refman/8.0/en/datetime.html for standard, and
// RowDataDebeziumDeserializeSchema#convertToTimestamp in flink-cdc-connector
// for implementation
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
index 5501e25bc..55ffeea1b 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
@@ -40,17 +40,6 @@ public class MySqlSchema {
private final LinkedHashMap<String, Tuple2<DataType, String>> fields;
private final List<String> primaryKeys;
- public MySqlSchema(
- String databaseName,
- String tableName,
- LinkedHashMap<String, Tuple2<DataType, String>> fields,
- List<String> primaryKeys) {
- this.databaseName = databaseName;
- this.tableName = tableName;
- this.fields = fields;
- this.primaryKeys = primaryKeys;
- }
-
public MySqlSchema(DatabaseMetaData metaData, String databaseName, String tableName)
throws Exception {
this.databaseName = databaseName;
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index 4bf8e799b..74df7cbac 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -23,6 +23,7 @@ import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.ActionBase;
+import org.apache.paimon.flink.action.cdc.DatabaseSyncMode;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
import org.apache.paimon.flink.sink.cdc.EventParser;
import org.apache.paimon.flink.sink.cdc.FlinkCdcSyncDatabaseSinkBuilder;
@@ -54,8 +55,8 @@ import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
-import static org.apache.paimon.flink.action.cdc.mysql.MySqlDatabaseSyncMode.SEPARATE;
-import static org.apache.paimon.flink.action.cdc.mysql.MySqlDatabaseSyncMode.UNIFIED;
+import static org.apache.paimon.flink.action.cdc.DatabaseSyncMode.COMBINED;
+import static org.apache.paimon.flink.action.cdc.DatabaseSyncMode.DIVIDED;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/**
@@ -107,7 +108,7 @@ public class MySqlSyncDatabaseAction extends ActionBase {
@Nullable private final Pattern excludingPattern;
private final Map<String, String> tableConfig;
private final String includingTables;
- private final MySqlDatabaseSyncMode mode;
+ private final DatabaseSyncMode mode;
public MySqlSyncDatabaseAction(
Map<String, String> mySqlConfig,
@@ -127,7 +128,7 @@ public class MySqlSyncDatabaseAction extends ActionBase {
null,
catalogConfig,
tableConfig,
- SEPARATE);
+ DIVIDED);
}
public MySqlSyncDatabaseAction(
@@ -141,7 +142,7 @@ public class MySqlSyncDatabaseAction extends ActionBase {
@Nullable String excludingTables,
Map<String, String> catalogConfig,
Map<String, String> tableConfig,
- MySqlDatabaseSyncMode mode) {
+ DatabaseSyncMode mode) {
super(warehouse, catalogConfig);
this.mySqlConfig = Configuration.fromMap(mySqlConfig);
this.database = database;
@@ -217,10 +218,10 @@ public class MySqlSyncDatabaseAction extends ActionBase {
+ "MySQL database are not compatible with those of existed Paimon tables. Please check the log.");
String tableList;
- if (mode == UNIFIED) {
- // First excluding all tables that failed the excludingPattern and those does not
- // have a primary key. Then including other table using regex so that newly
- // added table DDLs and DMLs during job runtime will be captured
+ if (mode == COMBINED) {
+ // First excluding all tables that failed the excludingPattern and don't have primary
+ // keys. Then including other tables using regex so that newly added table DDLs and DMLs
+ // during job runtime can be captured
tableList =
excludedTables.stream()
.map(t -> String.format("(?!(%s))", t))
@@ -234,11 +235,15 @@ public class MySqlSyncDatabaseAction extends ActionBase {
String serverTimeZone = mySqlConfig.get(MySqlSourceOptions.SERVER_TIME_ZONE);
ZoneId zoneId = serverTimeZone == null ? ZoneId.systemDefault() : ZoneId.of(serverTimeZone);
+ MySqlTableSchemaBuilder schemaBuilder =
+ new MySqlTableSchemaBuilder(tableConfig, caseSensitive);
EventParser.Factory<String> parserFactory =
- () -> new MySqlDebeziumJsonEventParser(zoneId, caseSensitive, tableNameConverter);
+ () ->
+ new MySqlDebeziumJsonEventParser(
+ zoneId, caseSensitive, tableNameConverter, schemaBuilder);
String database = this.database;
- MySqlDatabaseSyncMode mode = this.mode;
+ DatabaseSyncMode mode = this.mode;
FlinkCdcSyncDatabaseSinkBuilder<String> sinkBuilder =
new FlinkCdcSyncDatabaseSinkBuilder<String>()
.withInput(
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java
index 1ccc0c713..0a6723bf7 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionFactory.java
@@ -20,14 +20,15 @@ package org.apache.paimon.flink.action.cdc.mysql;
import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.ActionFactory;
+import org.apache.paimon.flink.action.cdc.DatabaseSyncMode;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import java.util.Map;
import java.util.Optional;
-import static org.apache.paimon.flink.action.cdc.mysql.MySqlDatabaseSyncMode.SEPARATE;
-import static org.apache.paimon.flink.action.cdc.mysql.MySqlDatabaseSyncMode.UNIFIED;
+import static org.apache.paimon.flink.action.cdc.DatabaseSyncMode.COMBINED;
+import static org.apache.paimon.flink.action.cdc.DatabaseSyncMode.DIVIDED;
/** Factory to create {@link MySqlSyncDatabaseAction}. */
public class MySqlSyncDatabaseActionFactory implements ActionFactory {
@@ -53,11 +54,21 @@ public class MySqlSyncDatabaseActionFactory implements ActionFactory {
String includingTables = params.get("including-tables");
String excludingTables = params.get("excluding-tables");
String mode = params.get("mode");
- MySqlDatabaseSyncMode syncMode;
- if ("unified".equalsIgnoreCase(mode)) {
- syncMode = UNIFIED;
+ DatabaseSyncMode syncMode;
+ if (mode == null) {
+ syncMode = DIVIDED;
} else {
- syncMode = SEPARATE;
+ switch (mode.toLowerCase()) {
+ case "divided":
+ syncMode = DIVIDED;
+ break;
+ case "combined":
+ syncMode = COMBINED;
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported mode '" + mode + "' for database synchronization mode.");
+ }
}
Map<String, String> mySqlConfig = optionalConfigMap(params, "mysql-conf");
@@ -96,6 +107,7 @@ public class MySqlSyncDatabaseActionFactory implements ActionFactory {
+ "[--table-suffix <paimon-table-suffix>] "
+ "[--including-tables <mysql-table-name|name-regular-expr>] "
+ "[--excluding-tables <mysql-table-name|name-regular-expr>] "
+ + "[--mode <sync-mode>] "
+ "[--mysql-conf <mysql-cdc-source-conf> [--mysql-conf <mysql-cdc-source-conf> ...]] "
+ "[--catalog-conf <paimon-catalog-conf> [--catalog-conf <paimon-catalog-conf> ...]] "
+ "[--table-conf <paimon-table-sink-conf> [--table-conf <paimon-table-sink-conf> ...]]");
@@ -123,6 +135,15 @@ public class MySqlSyncDatabaseActionFactory implements ActionFactory {
"--excluding-tables has higher priority than --including-tables if you specified both.");
System.out.println();
+ System.out.println(
+ "--mode is used to specify synchronization mode. You can specify two modes:");
+ System.out.println(
+ " 1. 'divided' (the default mode if you haven't specified one): "
+ + "start a sink for each table, the synchronization of the new table requires restarting the job;");
+ System.out.println(
+ " 2. 'combined': start a single combined sink for all tables, the new table will be automatically synchronized.");
+ System.out.println();
+
System.out.println("MySQL CDC source conf syntax:");
System.out.println(" key=value");
System.out.println(
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
new file mode 100644
index 000000000..fb6a47cbc
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mysql;
+
+import org.apache.paimon.flink.sink.cdc.NewTableSchemaBuilder;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.types.DataType;
+
+import com.alibaba.druid.sql.SQLUtils;
+import com.alibaba.druid.sql.ast.SQLDataType;
+import com.alibaba.druid.sql.ast.SQLExpr;
+import com.alibaba.druid.sql.ast.SQLName;
+import com.alibaba.druid.sql.ast.SQLStatement;
+import com.alibaba.druid.sql.ast.expr.SQLCharExpr;
+import com.alibaba.druid.sql.ast.expr.SQLIntegerExpr;
+import com.alibaba.druid.sql.ast.statement.SQLColumnDefinition;
+import com.alibaba.druid.sql.ast.statement.SQLTableElement;
+import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlCreateTableStatement;
+import com.alibaba.druid.util.JdbcConstants;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Schema builder for MySQL cdc. */
+public class MySqlTableSchemaBuilder implements NewTableSchemaBuilder<String> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MySqlTableSchemaBuilder.class);
+
+ private final Map<String, String> tableConfig;
+ private final boolean caseSensitive;
+
+ public MySqlTableSchemaBuilder(Map<String, String> tableConfig, boolean caseSensitive) {
+ this.tableConfig = tableConfig;
+ this.caseSensitive = caseSensitive;
+ }
+
+ @Override
+ public Optional<Schema> build(String ddl) {
+ SQLStatement statement = SQLUtils.parseSingleStatement(ddl, JdbcConstants.MYSQL);
+
+ if (!(statement instanceof MySqlCreateTableStatement)) {
+ return Optional.empty();
+ }
+
+ MySqlCreateTableStatement createTableStatement = (MySqlCreateTableStatement) statement;
+ String tableName = createTableStatement.getTableName();
+
+ List<String> primaryKeys = createTableStatement.getPrimaryKeyNames();
+ if (primaryKeys.isEmpty()) {
+ LOG.debug(
+ "Didn't find primary keys from MySQL DDL for table '{}'. "
+ + "This table won't be synchronized.",
+ tableName);
+ // TODO: for non-pk tables, we should not handle their records because we haven't
+ // created them here. Fix in 'MySqlDebeziumJsonEventParser'
+ return Optional.empty();
+ }
+
+ List<SQLTableElement> columns = createTableStatement.getTableElementList();
+ LinkedHashMap<String, Tuple2<DataType, String>> fields = new LinkedHashMap<>();
+
+ for (SQLTableElement element : columns) {
+ if (element instanceof SQLColumnDefinition) {
+ SQLColumnDefinition column = (SQLColumnDefinition) element;
+ SQLName name = column.getName();
+ SQLDataType dataType = column.getDataType();
+ List<SQLExpr> arguments = dataType.getArguments();
+ Integer precision = null;
+ Integer scale = null;
+ if (arguments.size() >= 1) {
+ precision = (int) (((SQLIntegerExpr) arguments.get(0)).getValue());
+ }
+
+ if (arguments.size() >= 2) {
+ scale = (int) (((SQLIntegerExpr) arguments.get(1)).getValue());
+ }
+
+ SQLCharExpr comment = (SQLCharExpr) column.getComment();
+ fields.put(
+ name.getSimpleName(),
+ Tuple2.of(
+ MySqlTypeUtils.toDataType(
+ column.getDataType().getName(), precision, scale),
+ comment == null ? null : String.valueOf(comment.getValue())));
+ }
+ }
+
+ if (!caseSensitive) {
+ LinkedHashMap<String, Tuple2<DataType, String>> tmp = new LinkedHashMap<>();
+ for (Map.Entry<String, Tuple2<DataType, String>> entry : fields.entrySet()) {
+ String fieldName = entry.getKey();
+ checkArgument(
+ !tmp.containsKey(fieldName.toLowerCase()),
+ "Duplicate key '%s' in table '%s' appears when converting fields map keys to case-insensitive form.",
+ fieldName,
+ tableName);
+ tmp.put(fieldName.toLowerCase(), entry.getValue());
+ }
+ fields = tmp;
+
+ primaryKeys =
+ primaryKeys.stream().map(String::toLowerCase).collect(Collectors.toList());
+ }
+
+ Schema.Builder builder = Schema.newBuilder();
+ builder.options(tableConfig);
+ for (Map.Entry<String, Tuple2<DataType, String>> entry : fields.entrySet()) {
+ builder.column(entry.getKey(), entry.getValue().f0, entry.getValue().f1);
+ }
+
+ Schema schema = builder.primaryKey(primaryKeys).build();
+
+ return Optional.of(schema);
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
index 556be5762..4f2e4a3b9 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
@@ -87,7 +87,7 @@ public class CdcDynamicTableParsingProcessFunction<T> extends ProcessFunction<T,
String tableName = parser.parseTableName();
// check for newly added table
- parser.parseNewTable(database)
+ parser.parseNewTable()
.ifPresent(
schema -> {
Identifier identifier =
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java
index e62864ee9..c0e481012 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java
@@ -59,10 +59,9 @@ public interface EventParser<T> {
/**
* Parse newly added table schema from event.
*
- * @param databaseName database of the new table
* @return empty if there is no newly added table
*/
- default Optional<Schema> parseNewTable(String databaseName) {
+ default Optional<Schema> parseNewTable() {
return Optional.empty();
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
index 6ba1278a1..0070f3ae9 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
@@ -20,7 +20,7 @@ package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.flink.action.cdc.mysql.MySqlDatabaseSyncMode;
+import org.apache.paimon.flink.action.cdc.DatabaseSyncMode;
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
import org.apache.paimon.flink.utils.SingleOutputStreamOperatorUtils;
import org.apache.paimon.schema.SchemaManager;
@@ -37,7 +37,7 @@ import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
-import static org.apache.paimon.flink.action.cdc.mysql.MySqlDatabaseSyncMode.UNIFIED;
+import static org.apache.paimon.flink.action.cdc.DatabaseSyncMode.COMBINED;
import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;
/**
@@ -68,7 +68,7 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
private Catalog.Loader catalogLoader;
// database to sync, currently only support single database
private String database;
- private MySqlDatabaseSyncMode mode;
+ private DatabaseSyncMode mode;
public FlinkCdcSyncDatabaseSinkBuilder<T> withInput(DataStream<T> input) {
this.input = input;
@@ -101,7 +101,7 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
return this;
}
- public FlinkCdcSyncDatabaseSinkBuilder<T> withMode(MySqlDatabaseSyncMode mode) {
+ public FlinkCdcSyncDatabaseSinkBuilder<T> withMode(DatabaseSyncMode mode) {
this.mode = mode;
return this;
}
@@ -112,14 +112,14 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
Preconditions.checkNotNull(database);
Preconditions.checkNotNull(catalogLoader);
- if (mode == UNIFIED) {
- buildUnifiedCdcSink();
+ if (mode == COMBINED) {
+ buildCombinedCdcSink();
} else {
- buildStaticCdcSink();
+ buildDividedCdcSink();
}
}
- private void buildUnifiedCdcSink() {
+ private void buildCombinedCdcSink() {
SingleOutputStreamOperator<Void> parsed =
input.forward()
.process(
@@ -162,7 +162,7 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
new FlinkCdcSink(table).sinkFrom(partitioned);
}
- private void buildStaticCdcSink() {
+ private void buildDividedCdcSink() {
Preconditions.checkNotNull(tables);
SingleOutputStreamOperator<Void> parsed =
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDatabaseSyncMode.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java
similarity index 67%
rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDatabaseSyncMode.java
rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java
index a7eccba66..0ceb820ff 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDatabaseSyncMode.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java
@@ -16,19 +16,16 @@
* limitations under the License.
*/
-package org.apache.paimon.flink.action.cdc.mysql;
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.schema.Schema;
import java.io.Serializable;
+import java.util.Optional;
-/**
- * There are two modes for database sync.
- *
- * <p>1) SEPARATE mode, start a sink for each table, the synchronization of the new table requires
- * restarting the job.
- *
- * <p>2) UNIFIED mode, start a unified sink, the new table will be automatically synchronized.
- */
-public enum MySqlDatabaseSyncMode implements Serializable {
- SEPARATE,
- UNIFIED
+/** Build table schema for newly added table in CDC ingestion. */
+@FunctionalInterface
+public interface NewTableSchemaBuilder<T> extends Serializable {
+
+ Optional<Schema> build(T source);
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecord.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecord.java
index c8d09f4d0..0b1fa30aa 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecord.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecord.java
@@ -22,6 +22,7 @@ import org.apache.paimon.types.DataType;
import java.io.Serializable;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Objects;
/** Compared to {@link CdcMultiplexRecord}, this contains schema information. */
@@ -29,33 +30,44 @@ public class RichCdcMultiplexRecord implements Serializable {
private static final long serialVersionUID = 1L;
- private final CdcRecord cdcRecord;
- private final LinkedHashMap<String, DataType> fieldTypes;
private final String databaseName;
private final String tableName;
+ private final LinkedHashMap<String, DataType> fieldTypes;
+ private final List<String> primaryKeys;
+ private final CdcRecord cdcRecord;
public RichCdcMultiplexRecord(
- CdcRecord cdcRecord,
- LinkedHashMap<String, DataType> fieldTypes,
String databaseName,
- String tableName) {
- this.cdcRecord = cdcRecord;
- this.fieldTypes = fieldTypes;
+ String tableName,
+ LinkedHashMap<String, DataType> fieldTypes,
+ List<String> primaryKeys,
+ CdcRecord cdcRecord) {
this.databaseName = databaseName;
this.tableName = tableName;
+ this.fieldTypes = fieldTypes;
+ this.primaryKeys = primaryKeys;
+ this.cdcRecord = cdcRecord;
}
public String tableName() {
return tableName;
}
+ public LinkedHashMap<String, DataType> fieldTypes() {
+ return fieldTypes;
+ }
+
+ public List<String> primaryKeys() {
+ return primaryKeys;
+ }
+
public RichCdcRecord toRichCdcRecord() {
return new RichCdcRecord(cdcRecord, fieldTypes);
}
@Override
public int hashCode() {
- return Objects.hash(cdcRecord, fieldTypes, databaseName, tableName);
+ return Objects.hash(databaseName, tableName, fieldTypes, primaryKeys, cdcRecord);
}
@Override
@@ -67,23 +79,26 @@ public class RichCdcMultiplexRecord implements Serializable {
return false;
}
RichCdcMultiplexRecord that = (RichCdcMultiplexRecord) o;
- return Objects.equals(cdcRecord, that.cdcRecord)
+ return databaseName.equals(that.databaseName)
+ && tableName.equals(that.tableName)
&& Objects.equals(fieldTypes, that.fieldTypes)
- && databaseName.equals(that.databaseName)
- && tableName.equals(that.tableName);
+ && Objects.equals(primaryKeys, that.primaryKeys)
+ && Objects.equals(cdcRecord, that.cdcRecord);
}
@Override
public String toString() {
return "{"
- + "cdcRecord="
- + cdcRecord
- + ", fieldTypes="
- + fieldTypes
- + ", databaseName="
+ + "databaseName="
+ databaseName
+ ", tableName="
+ tableName
+ + ", fieldTypes="
+ + fieldTypes
+ + ", primaryKeys="
+ + primaryKeys
+ + ", cdcRecord="
+ + cdcRecord
+ '}';
}
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
index 105f45be4..326dad0d4 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
@@ -18,26 +18,64 @@
package org.apache.paimon.flink.sink.cdc;
+import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.DataField;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Pattern;
/** {@link EventParser} for {@link RichCdcMultiplexRecord}. */
public class RichCdcMultiplexRecordEventParser implements EventParser<RichCdcMultiplexRecord> {
- // TODO: currently we don't consider database
+ private static final Logger LOG =
+ LoggerFactory.getLogger(RichCdcMultiplexRecordEventParser.class);
+
+ private final NewTableSchemaBuilder<RichCdcMultiplexRecord> schemaBuilder;
+ @Nullable private final Pattern includingPattern;
+ @Nullable private final Pattern excludingPattern;
+ private final Map<String, RichEventParser> parsers = new HashMap<>();
+ private final Set<String> includedTables = new HashSet<>();
+ private final Set<String> excludedTables = new HashSet<>();
+ private final Set<String> createdTables = new HashSet<>();
+
+ private RichCdcMultiplexRecord record;
private String currentTable;
+ private boolean shouldSynchronizeCurrentTable;
private RichEventParser currentParser;
- private final Map<String, RichEventParser> parsers = new HashMap<>();
+ public RichCdcMultiplexRecordEventParser() {
+ this(record -> Optional.empty(), null, null);
+ }
+
+ public RichCdcMultiplexRecordEventParser(
+ NewTableSchemaBuilder<RichCdcMultiplexRecord> schemaBuilder,
+ @Nullable Pattern includingPattern,
+ @Nullable Pattern excludingPattern) {
+ this.schemaBuilder = schemaBuilder;
+ this.includingPattern = includingPattern;
+ this.excludingPattern = excludingPattern;
+ }
@Override
public void setRawEvent(RichCdcMultiplexRecord record) {
+ this.record = record;
this.currentTable = record.tableName();
- this.currentParser = parsers.computeIfAbsent(currentTable, t -> new RichEventParser());
- currentParser.setRawEvent(record.toRichCdcRecord());
+ this.shouldSynchronizeCurrentTable = shouldSynchronizeCurrentTable(record.primaryKeys());
+ if (shouldSynchronizeCurrentTable) {
+ this.currentParser = parsers.computeIfAbsent(currentTable, t -> new RichEventParser());
+ this.currentParser.setRawEvent(record.toRichCdcRecord());
+ }
}
@Override
@@ -47,11 +85,65 @@ public class RichCdcMultiplexRecordEventParser implements EventParser<RichCdcMul
@Override
public List<DataField> parseSchemaChange() {
- return currentParser.parseSchemaChange();
+ return shouldSynchronizeCurrentTable
+ ? currentParser.parseSchemaChange()
+ : Collections.emptyList();
}
@Override
public List<CdcRecord> parseRecords() {
- return currentParser.parseRecords();
+ return shouldSynchronizeCurrentTable
+ ? currentParser.parseRecords()
+ : Collections.emptyList();
+ }
+
+ @Override
+ public Optional<Schema> parseNewTable() {
+ if (shouldCreateCurrentTable()) {
+ return schemaBuilder.build(record);
+ }
+
+ return Optional.empty();
+ }
+
+ private boolean shouldSynchronizeCurrentTable(List<String> primaryKeys) {
+ if (includedTables.contains(currentTable)) {
+ return true;
+ }
+ if (excludedTables.contains(currentTable)) {
+ return false;
+ }
+
+ boolean shouldSynchroniz = true;
+ if (includingPattern != null) {
+ shouldSynchroniz = includingPattern.matcher(currentTable).matches();
+ }
+ if (excludingPattern != null) {
+ shouldSynchroniz =
+ shouldSynchroniz && !excludingPattern.matcher(currentTable).matches();
+ }
+ if (!shouldSynchroniz) {
+ LOG.debug(
+ "Source table {} won't be synchronized because it was excluded. ",
+ currentTable);
+ excludedTables.add(currentTable);
+ return false;
+ }
+
+ if (primaryKeys.isEmpty()) {
+ LOG.debug(
+ "Didn't find primary keys from kafka topic's table schemas for table '{}'. "
+ + "This table won't be synchronized.",
+ currentTable);
+ excludedTables.add(currentTable);
+ return false;
+ }
+
+ includedTables.add(currentTable);
+ return true;
+ }
+
+ private boolean shouldCreateCurrentTable() {
+ return shouldSynchronizeCurrentTable && createdTables.add(currentTable);
}
}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordSchemaBuilder.java
similarity index 52%
copy from paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
copy to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordSchemaBuilder.java
index e39a74d5c..9b83b6d7e 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordSchemaBuilder.java
@@ -19,40 +19,32 @@
package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.schema.Schema;
-import org.apache.paimon.types.DataField;
-import org.apache.paimon.utils.ObjectUtils;
+import org.apache.paimon.types.DataType;
-import java.util.Collections;
-import java.util.List;
+import java.util.Map;
import java.util.Optional;
-/** Testing {@link EventParser} for {@link TestCdcEvent}. */
-public class TestCdcEventParser implements EventParser<TestCdcEvent> {
+/** Schema builder for {@link RichCdcMultiplexRecord}. */
+public class RichCdcMultiplexRecordSchemaBuilder
+ implements NewTableSchemaBuilder<RichCdcMultiplexRecord> {
- private TestCdcEvent raw;
+ private final Map<String, String> tableConfig;
- @Override
- public void setRawEvent(TestCdcEvent raw) {
- this.raw = raw;
+ public RichCdcMultiplexRecordSchemaBuilder(Map<String, String> tableConfig) {
+ this.tableConfig = tableConfig;
}
@Override
- public String parseTableName() {
- return raw.tableName();
- }
+ public Optional<Schema> build(RichCdcMultiplexRecord record) {
+ Schema.Builder builder = Schema.newBuilder();
+ builder.options(tableConfig);
- @Override
- public List<DataField> parseSchemaChange() {
- return ObjectUtils.coalesce(raw.updatedDataFields(), Collections.emptyList());
- }
+ for (Map.Entry<String, DataType> entry : record.fieldTypes().entrySet()) {
+ builder.column(entry.getKey(), entry.getValue(), null);
+ }
- @Override
- public List<CdcRecord> parseRecords() {
- return ObjectUtils.coalesce(raw.records(), Collections.emptyList());
- }
+ Schema schema = builder.primaryKey(record.primaryKeys()).build();
- @Override
- public Optional<Schema> parseNewTable(String databaseName) {
- return Optional.empty();
+ return Optional.of(schema);
}
}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
index 580240773..2582cd5c0 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java
@@ -18,7 +18,12 @@
package org.apache.paimon.flink.action.cdc.kafka;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.ActionITCaseBase;
+import org.apache.paimon.fs.Path;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.TableScan;
@@ -363,4 +368,13 @@ public abstract class KafkaActionITCaseBase extends ActionITCaseBase {
Thread.sleep(1000);
}
}
+
+ protected Catalog catalog() {
+ return CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse)));
+ }
+
+ protected FileStoreTable getFileStoreTable(String tableName) throws Exception {
+ Identifier identifier = Identifier.create(database, tableName);
+ return (FileStoreTable) catalog().getTable(identifier);
+ }
}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
index 5add09619..8a06b2df8 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
@@ -28,6 +28,7 @@ import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.CommonTestUtils;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.core.execution.JobClient;
@@ -37,6 +38,7 @@ import org.junit.jupiter.api.Timeout;
import javax.annotation.Nullable;
+import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -96,12 +98,7 @@ public class KafkaCanalSyncDatabaseActionITCase extends KafkaActionITCaseBase {
tableConfig.put("sink.parallelism", String.valueOf(random.nextInt(3) + 1));
KafkaSyncDatabaseAction action =
new KafkaSyncDatabaseAction(
- kafkaConfig,
- warehouse,
- database,
- false,
- Collections.emptyMap(),
- tableConfig);
+ kafkaConfig, warehouse, database, Collections.emptyMap(), tableConfig);
action.build(env);
JobClient client = env.executeAsync();
waitJobRunning(client);
@@ -152,12 +149,7 @@ public class KafkaCanalSyncDatabaseActionITCase extends KafkaActionITCaseBase {
tableConfig.put("sink.parallelism", String.valueOf(random.nextInt(3) + 1));
KafkaSyncDatabaseAction action =
new KafkaSyncDatabaseAction(
- kafkaConfig,
- warehouse,
- database,
- false,
- Collections.emptyMap(),
- tableConfig);
+ kafkaConfig, warehouse, database, Collections.emptyMap(), tableConfig);
action.build(env);
JobClient client = env.executeAsync();
waitJobRunning(client);
@@ -167,6 +159,8 @@ public class KafkaCanalSyncDatabaseActionITCase extends KafkaActionITCaseBase {
private void testSchemaEvolutionImpl(List<String> topics, boolean writeOne, int fileCount)
throws Exception {
+ waitTablesCreated("t1", "t2");
+
FileStoreTable table1 = getFileStoreTable("t1");
FileStoreTable table2 = getFileStoreTable("t2");
@@ -296,7 +290,6 @@ public class KafkaCanalSyncDatabaseActionITCase extends KafkaActionITCaseBase {
kafkaConfig,
warehouse,
database,
- false,
Collections.emptyMap(),
Collections.emptyMap());
@@ -367,8 +360,6 @@ public class KafkaCanalSyncDatabaseActionITCase extends KafkaActionITCaseBase {
kafkaConfig,
warehouse,
database,
- 0,
- false,
"test_prefix_",
"_test_suffix",
null,
@@ -440,8 +431,6 @@ public class KafkaCanalSyncDatabaseActionITCase extends KafkaActionITCaseBase {
kafkaConfig,
warehouse,
database,
- 0,
- false,
"test_prefix_",
"_test_suffix",
null,
@@ -457,6 +446,8 @@ public class KafkaCanalSyncDatabaseActionITCase extends KafkaActionITCaseBase {
private void testTableAffixImpl(List<String> topics, boolean writeOne, int fileCount)
throws Exception {
+ waitTablesCreated("test_prefix_t1_test_suffix", "test_prefix_t2_test_suffix");
+
FileStoreTable table1 = getFileStoreTable("test_prefix_t1_test_suffix");
FileStoreTable table2 = getFileStoreTable("test_prefix_t2_test_suffix");
@@ -631,8 +622,6 @@ public class KafkaCanalSyncDatabaseActionITCase extends KafkaActionITCaseBase {
kafkaConfig,
warehouse,
database,
- 0,
- false,
null,
null,
includingTables,
@@ -644,29 +633,30 @@ public class KafkaCanalSyncDatabaseActionITCase extends KafkaActionITCaseBase {
waitJobRunning(client);
// check paimon tables
- assertTableExists(existedTables);
+ waitTablesCreated(existedTables.toArray(new String[0]));
assertTableNotExists(notExistedTables);
}
- private FileStoreTable getFileStoreTable(String tableName) throws Exception {
- Catalog catalog = CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse)));
- Identifier identifier = Identifier.create(database, tableName);
- return (FileStoreTable) catalog.getTable(identifier);
- }
-
- private void assertTableExists(List<String> tableNames) {
- Catalog catalog = CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse)));
- for (String tableName : tableNames) {
- Identifier identifier = Identifier.create(database, tableName);
- assertThat(catalog.tableExists(identifier)).isTrue();
- }
- }
-
private void assertTableNotExists(List<String> tableNames) {
- Catalog catalog = CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse)));
+ Catalog catalog = catalog();
for (String tableName : tableNames) {
Identifier identifier = Identifier.create(database, tableName);
assertThat(catalog.tableExists(identifier)).isFalse();
}
}
+
+ private void waitTablesCreated(String... tables) throws Exception {
+ CommonTestUtils.waitUtil(
+ () -> {
+ try {
+ List<String> existed = catalog().listTables(database);
+ return existed.containsAll(Arrays.asList(tables));
+ } catch (Catalog.DatabaseNotExistException e) {
+ throw new RuntimeException(e);
+ }
+ },
+ Duration.ofSeconds(5),
+ Duration.ofMillis(100),
+ "Failed to wait tables to be created in 5 seconds.");
+ }
}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
index 1472021b0..bfbd89fe6 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
@@ -98,7 +98,7 @@ public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
tableName,
Collections.singletonList("pt"),
Arrays.asList("pt", "_id"),
- Collections.EMPTY_LIST,
+ Collections.emptyList(),
Collections.emptyMap(),
tableConfig);
action.build(env);
@@ -110,7 +110,7 @@ public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
}
private void testSchemaEvolutionImpl(String topic, String sourceDir) throws Exception {
- FileStoreTable table = getFileStoreTable();
+ FileStoreTable table = getFileStoreTable(tableName);
RowType rowType =
RowType.of(
@@ -271,7 +271,7 @@ public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
tableName,
Collections.emptyList(),
Collections.singletonList("_id"),
- Collections.EMPTY_LIST,
+ Collections.emptyList(),
Collections.emptyMap(),
Collections.emptyMap());
action.build(env);
@@ -283,7 +283,7 @@ public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
}
private void testSchemaEvolutionMultipleImpl(String topic) throws Exception {
- FileStoreTable table = getFileStoreTable();
+ FileStoreTable table = getFileStoreTable(tableName);
RowType rowType =
RowType.of(
@@ -533,7 +533,7 @@ public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
"_geometrycollection",
"_set",
});
- FileStoreTable table = getFileStoreTable();
+ FileStoreTable table = getFileStoreTable(tableName);
List<String> expected =
Arrays.asList(
"+I["
@@ -637,8 +637,6 @@ public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
waitForResult(expected, table, rowType, Arrays.asList("pt", "_id"));
SchemaManager schemaManager = new SchemaManager(table.fileIO(), table.location());
schemaManager.commitChanges(SchemaChange.dropColumn("v"));
- List<DataField> fields = table.schema().fields();
- int size = fields.size();
}
}
@@ -675,7 +673,7 @@ public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
tableName,
Collections.singletonList("pt"),
Arrays.asList("pt", "_id"),
- Collections.EMPTY_LIST,
+ Collections.emptyList(),
Collections.emptyMap(),
tableConfig);
UnsupportedOperationException e =
@@ -719,7 +717,7 @@ public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
tableName,
Collections.singletonList("pt"),
Arrays.asList("pt", "_id"),
- Collections.EMPTY_LIST,
+ Collections.emptyList(),
Collections.emptyMap(),
tableConfig);
Exception e = assertThrows(Exception.class, () -> action.build(env), "Expecting Exception");
@@ -771,7 +769,7 @@ public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
tableName,
Collections.singletonList("pt"),
Arrays.asList("pt", "_id"),
- Collections.EMPTY_LIST,
+ Collections.emptyList(),
Collections.emptyMap(),
tableConfig);
IllegalArgumentException e =
@@ -821,7 +819,7 @@ public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
tableName,
Collections.singletonList("pt"),
Arrays.asList("pt", "_id"),
- Collections.EMPTY_LIST,
+ Collections.emptyList(),
Collections.emptyMap(),
tableConfig);
action.build(env);
@@ -829,7 +827,7 @@ public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
waitJobRunning(client);
- FileStoreTable table = getFileStoreTable();
+ FileStoreTable table = getFileStoreTable(tableName);
RowType rowType =
RowType.of(
@@ -879,7 +877,7 @@ public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
tableName,
Collections.singletonList("pt"),
Arrays.asList("pt", "_id"),
- Collections.EMPTY_LIST,
+ Collections.emptyList(),
Collections.emptyMap(),
tableConfig);
action.build(env);
@@ -892,7 +890,7 @@ public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
} catch (Exception e) {
throw new Exception("Failed to write canal data to Kafka.", e);
}
- FileStoreTable table = getFileStoreTable();
+ FileStoreTable table = getFileStoreTable(tableName);
RowType rowType =
RowType.of(
@@ -944,7 +942,7 @@ public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
tableName,
Collections.singletonList("pt"),
Arrays.asList("pt", "_id"),
- Collections.EMPTY_LIST,
+ Collections.emptyList(),
Collections.emptyMap(),
tableConfig);
action.build(env);
@@ -957,7 +955,7 @@ public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
} catch (Exception e) {
throw new Exception("Failed to write canal data to Kafka.", e);
}
- FileStoreTable table = getFileStoreTable();
+ FileStoreTable table = getFileStoreTable(tableName);
RowType rowType =
RowType.of(
@@ -1007,7 +1005,7 @@ public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
tableName,
Collections.singletonList("pt"),
Arrays.asList("pt", "_id"),
- Collections.EMPTY_LIST,
+ Collections.emptyList(),
Collections.emptyMap(),
tableConfig);
action.build(env);
@@ -1020,7 +1018,7 @@ public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
} catch (Exception e) {
throw new Exception("Failed to write canal data to Kafka.", e);
}
- FileStoreTable table = getFileStoreTable();
+ FileStoreTable table = getFileStoreTable(tableName);
RowType rowType =
RowType.of(
@@ -1072,7 +1070,7 @@ public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
tableName,
Collections.singletonList("pt"),
Arrays.asList("pt", "_id"),
- Collections.EMPTY_LIST,
+ Collections.emptyList(),
Collections.emptyMap(),
tableConfig);
action.build(env);
@@ -1085,7 +1083,7 @@ public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
} catch (Exception e) {
throw new Exception("Failed to write canal data to Kafka.", e);
}
- FileStoreTable table = getFileStoreTable();
+ FileStoreTable table = getFileStoreTable(tableName);
RowType rowType =
RowType.of(
@@ -1102,10 +1100,4 @@ public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
"+I[1, 1, one]", "+I[1, 2, two]", "+I[1, 3, three]", "+I[1, 4, four]");
waitForResult(expected, table, rowType, primaryKeys);
}
-
- private FileStoreTable getFileStoreTable() throws Exception {
- Catalog catalog = CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse)));
- Identifier identifier = Identifier.create(database, tableName);
- return (FileStoreTable) catalog.getTable(identifier);
- }
}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
index 0e3143a0a..07aaebfcc 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
@@ -59,8 +59,8 @@ import java.util.Objects;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
-import static org.apache.paimon.flink.action.cdc.mysql.MySqlDatabaseSyncMode.SEPARATE;
-import static org.apache.paimon.flink.action.cdc.mysql.MySqlDatabaseSyncMode.UNIFIED;
+import static org.apache.paimon.flink.action.cdc.DatabaseSyncMode.COMBINED;
+import static org.apache.paimon.flink.action.cdc.DatabaseSyncMode.DIVIDED;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -374,7 +374,7 @@ public class MySqlSyncDatabaseActionITCase extends MySqlActionITCaseBase {
null,
Collections.emptyMap(),
tableConfig,
- SEPARATE);
+ DIVIDED);
action.build(env);
JobClient client = env.executeAsync();
waitJobRunning(client);
@@ -549,7 +549,7 @@ public class MySqlSyncDatabaseActionITCase extends MySqlActionITCaseBase {
excludingTables,
Collections.emptyMap(),
tableConfig,
- SEPARATE);
+ DIVIDED);
action.build(env);
JobClient client = env.executeAsync();
waitJobRunning(client);
@@ -924,7 +924,7 @@ public class MySqlSyncDatabaseActionITCase extends MySqlActionITCaseBase {
null,
catalogConfig,
tableConfig,
- UNIFIED);
+ COMBINED);
action.build(env);
if (Objects.nonNull(savepointPath)) {
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
index e39a74d5c..204f8536d 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
@@ -18,13 +18,11 @@
package org.apache.paimon.flink.sink.cdc;
-import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.DataField;
import org.apache.paimon.utils.ObjectUtils;
import java.util.Collections;
import java.util.List;
-import java.util.Optional;
/** Testing {@link EventParser} for {@link TestCdcEvent}. */
public class TestCdcEventParser implements EventParser<TestCdcEvent> {
@@ -50,9 +48,4 @@ public class TestCdcEventParser implements EventParser<TestCdcEvent> {
public List<CdcRecord> parseRecords() {
return ObjectUtils.coalesce(raw.records(), Collections.emptyList());
}
-
- @Override
- public Optional<Schema> parseNewTable(String databaseName) {
- return Optional.empty();
- }
}