You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/07/11 13:23:39 UTC
[incubator-paimon] branch master updated: [flink] Kafka canal CDC should recognize schema change when ddl record is missing (#1526)
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ac5d8d143 [flink] Kafka canal CDC should recognize schema change when ddl record is missing (#1526)
ac5d8d143 is described below
commit ac5d8d1438a447a07a64f8f4644c5cd504e421cd
Author: yuzelin <33...@users.noreply.github.com>
AuthorDate: Tue Jul 11 21:23:35 2023 +0800
[flink] Kafka canal CDC should recognize schema change when ddl record is missing (#1526)
---
.../java/org/apache/paimon/utils/StringUtils.java | 4 +
.../flink/action/cdc/kafka/KafkaActionUtils.java | 2 +-
.../action/cdc/kafka/KafkaSyncDatabaseAction.java | 20 +-
.../action/cdc/kafka/KafkaSyncTableAction.java | 19 +-
.../cdc/kafka/canal/CanalJsonEventParser.java | 332 -------------------
.../action/cdc/kafka/canal/CanalRecordParser.java | 352 +++++++++++++++++++++
.../cdc/mysql/MySqlDebeziumJsonEventParser.java | 6 -
.../apache/paimon/flink/sink/cdc/CdcRecord.java | 5 +
.../apache/paimon/flink/sink/cdc/EventParser.java | 4 -
.../flink/sink/cdc/RichCdcMultiplexRecord.java | 89 ++++++
.../cdc/RichCdcMultiplexRecordEventParser.java} | 38 +--
.../flink/sink/cdc/RichCdcParserFactory.java | 70 ----
.../paimon/flink/sink/cdc/RichCdcRecord.java | 45 +--
.../paimon/flink/sink/cdc/RichCdcSinkBuilder.java | 2 +-
.../paimon/flink/sink/cdc/RichEventParser.java | 65 ++++
.../cdc/kafka/KafkaCanalSyncTableActionITCase.java | 29 +-
.../action/cdc/kafka/KafkaEventParserTest.java | 279 ----------------
.../paimon/flink/sink/cdc/TestCdcEventParser.java | 5 -
.../schemaevolutionmissingddl/canal-data-1.txt | 20 ++
.../schemaevolutionmissingddl/canal-data-2.txt | 21 ++
.../schemaevolutionmissingddl/canal-data-3.txt | 22 ++
.../schemaevolutionmissingddl/canal-data-4.txt | 20 ++
.../schemaevolutionmissingddl/canal-data-5.txt | 20 ++
23 files changed, 702 insertions(+), 767 deletions(-)
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
index 745a1c75a..d319ad642 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
@@ -535,4 +535,8 @@ public class StringUtils {
}
return true;
}
+
+ public static String caseSensitiveConversion(String str, boolean caseSensitive) {
+ return caseSensitive ? str : str.toLowerCase();
+ }
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
index 43bf5d8c7..148e3ced2 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
@@ -158,7 +158,7 @@ class KafkaActionUtils {
}
static KafkaSource<String> buildKafkaSource(Configuration kafkaConfig) {
- KafkaSourceBuilder kafkaSourceBuilder = KafkaSource.builder();
+ KafkaSourceBuilder<String> kafkaSourceBuilder = KafkaSource.builder();
String groupId = kafkaConfig.get(KafkaConnectorOptions.PROPS_GROUP_ID);
kafkaSourceBuilder
.setTopics(kafkaConfig.get(KafkaConnectorOptions.TOPIC))
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
index 3f4a433fa..07b20c523 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
@@ -24,9 +24,11 @@ import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
-import org.apache.paimon.flink.action.cdc.kafka.canal.CanalJsonEventParser;
+import org.apache.paimon.flink.action.cdc.kafka.canal.CanalRecordParser;
import org.apache.paimon.flink.sink.cdc.EventParser;
import org.apache.paimon.flink.sink.cdc.FlinkCdcSyncDatabaseSinkBuilder;
+import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
+import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecordEventParser;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
@@ -212,18 +214,24 @@ public class KafkaSyncDatabaseAction extends ActionBase {
kafkaConfig.set(KafkaConnectorOptions.TOPIC, monitoredTopics);
KafkaSource<String> source = KafkaActionUtils.buildKafkaSource(kafkaConfig);
- EventParser.Factory<String> parserFactory;
+ EventParser.Factory<RichCdcMultiplexRecord> parserFactory;
String format = kafkaConfig.get(KafkaConnectorOptions.VALUE_FORMAT);
if ("canal-json".equals(format)) {
- parserFactory = () -> new CanalJsonEventParser(caseSensitive, tableNameConverter);
+ parserFactory = RichCdcMultiplexRecordEventParser::new;
} else {
throw new UnsupportedOperationException("This format: " + format + " is not support.");
}
- FlinkCdcSyncDatabaseSinkBuilder<String> sinkBuilder =
- new FlinkCdcSyncDatabaseSinkBuilder<String>()
+
+ FlinkCdcSyncDatabaseSinkBuilder<RichCdcMultiplexRecord> sinkBuilder =
+ new FlinkCdcSyncDatabaseSinkBuilder<RichCdcMultiplexRecord>()
.withInput(
env.fromSource(
- source, WatermarkStrategy.noWatermarks(), "Kafka Source"))
+ source,
+ WatermarkStrategy.noWatermarks(),
+ "Kafka Source")
+ .flatMap(
+ new CanalRecordParser(
+ caseSensitive, tableNameConverter)))
.withParserFactory(parserFactory)
.withTables(fileStoreTables)
.withCatalogLoader(catalogLoader())
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
index 936e03fab..d890eeaa7 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
@@ -24,9 +24,11 @@ import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
-import org.apache.paimon.flink.action.cdc.kafka.canal.CanalJsonEventParser;
+import org.apache.paimon.flink.action.cdc.kafka.canal.CanalRecordParser;
import org.apache.paimon.flink.sink.cdc.CdcSinkBuilder;
import org.apache.paimon.flink.sink.cdc.EventParser;
+import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
+import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecordEventParser;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
@@ -166,18 +168,23 @@ public class KafkaSyncTableAction extends ActionBase {
table = (FileStoreTable) catalog.getTable(identifier);
}
String format = kafkaConfig.get(KafkaConnectorOptions.VALUE_FORMAT);
- EventParser.Factory<String> parserFactory;
+ EventParser.Factory<RichCdcMultiplexRecord> parserFactory;
if ("canal-json".equals(format)) {
- parserFactory = () -> new CanalJsonEventParser(caseSensitive, computedColumns);
+ parserFactory = RichCdcMultiplexRecordEventParser::new;
} else {
throw new UnsupportedOperationException("This format: " + format + " is not support.");
}
- CdcSinkBuilder<String> sinkBuilder =
- new CdcSinkBuilder<String>()
+ CdcSinkBuilder<RichCdcMultiplexRecord> sinkBuilder =
+ new CdcSinkBuilder<RichCdcMultiplexRecord>()
.withInput(
env.fromSource(
- source, WatermarkStrategy.noWatermarks(), "Kafka Source"))
+ source,
+ WatermarkStrategy.noWatermarks(),
+ "Kafka Source")
+ .flatMap(
+ new CanalRecordParser(
+ caseSensitive, computedColumns)))
.withParserFactory(parserFactory)
.withTable(table)
.withIdentifier(identifier)
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/canal/CanalJsonEventParser.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/canal/CanalJsonEventParser.java
deleted file mode 100644
index 7d68597b9..000000000
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/canal/CanalJsonEventParser.java
+++ /dev/null
@@ -1,332 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.action.cdc.kafka.canal;
-
-import org.apache.paimon.flink.action.cdc.ComputedColumn;
-import org.apache.paimon.flink.action.cdc.TableNameConverter;
-import org.apache.paimon.flink.action.cdc.mysql.MySqlTypeUtils;
-import org.apache.paimon.flink.sink.cdc.CdcRecord;
-import org.apache.paimon.flink.sink.cdc.EventParser;
-import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.DataType;
-import org.apache.paimon.types.RowKind;
-import org.apache.paimon.utils.Preconditions;
-import org.apache.paimon.utils.StringUtils;
-
-import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
-import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
-import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
-import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.NullNode;
-
-import com.alibaba.druid.DbType;
-import com.alibaba.druid.sql.SQLUtils;
-import com.alibaba.druid.sql.ast.SQLStatement;
-import com.alibaba.druid.sql.ast.statement.SQLAlterTableAddColumn;
-import com.alibaba.druid.sql.ast.statement.SQLAlterTableDropColumnItem;
-import com.alibaba.druid.sql.ast.statement.SQLAlterTableItem;
-import com.alibaba.druid.sql.ast.statement.SQLAlterTableStatement;
-import com.alibaba.druid.sql.ast.statement.SQLColumnDefinition;
-import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlAlterTableChangeColumn;
-import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlAlterTableModifyColumn;
-
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-import static org.apache.paimon.utils.Preconditions.checkArgument;
-
-/** {@link EventParser} for Canal-json. */
-public class CanalJsonEventParser implements EventParser<String> {
-
- private static final String FIELD_DATA = "data";
- private static final String FIELD_OLD = "old";
- private static final String TYPE = "type";
- private static final String MYSQL_TYPE = "mysqlType";
- private static final String OP_INSERT = "INSERT";
- private static final String OP_UPDATE = "UPDATE";
- private static final String OP_DELETE = "DELETE";
-
- private final ObjectMapper objectMapper = new ObjectMapper();
-
- private JsonNode root;
-
- private final boolean caseSensitive;
- private final TableNameConverter tableNameConverter;
- private final List<ComputedColumn> computedColumns;
-
- public CanalJsonEventParser(boolean caseSensitive, List<ComputedColumn> computedColumns) {
- this(caseSensitive, new TableNameConverter(caseSensitive), computedColumns);
- }
-
- public CanalJsonEventParser(boolean caseSensitive, TableNameConverter tableNameConverter) {
- this(caseSensitive, tableNameConverter, Collections.emptyList());
- }
-
- public CanalJsonEventParser(
- boolean caseSensitive,
- TableNameConverter tableNameConverter,
- List<ComputedColumn> computedColumns) {
- this.caseSensitive = caseSensitive;
- this.tableNameConverter = tableNameConverter;
- this.computedColumns = computedColumns;
- }
-
- @Override
- public void setRawEvent(String rawEvent) {
- try {
- root = objectMapper.readValue(rawEvent, JsonNode.class);
-
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public String parseTableName() {
- String tableName = root.get("table").asText();
- return tableNameConverter.convert(tableName);
- }
-
- private boolean isSchemaChange() {
- if (root.get("isDdl") == null) {
- return false;
- } else {
- return "true".equals(root.get("isDdl").asText());
- }
- }
-
- @Override
- public List<DataField> parseSchemaChange() {
- if (!isSchemaChange()) {
- return Collections.emptyList();
- }
-
- String sql = root.get("sql").asText();
-
- if (StringUtils.isEmpty(sql)) {
- return Collections.emptyList();
- }
- List<DataField> result = new ArrayList<>();
- int id = 0;
- SQLStatement sqlStatement = SQLUtils.parseSingleStatement(sql, DbType.mysql);
- if (sqlStatement instanceof SQLAlterTableStatement) {
- SQLAlterTableStatement sqlAlterTableStatement = (SQLAlterTableStatement) sqlStatement;
- for (int i = 0; i < sqlAlterTableStatement.getItems().size(); i++) {
- SQLAlterTableItem sqlAlterTableItem = sqlAlterTableStatement.getItems().get(i);
- if (sqlAlterTableItem instanceof SQLAlterTableAddColumn) {
- SQLAlterTableAddColumn sqlAlterTableAddColumn =
- (SQLAlterTableAddColumn) sqlAlterTableItem;
- List<SQLColumnDefinition> columns = sqlAlterTableAddColumn.getColumns();
- for (SQLColumnDefinition column : columns) {
- String columnName = column.getColumnName().replace("`", "");
- String columnType = column.getDataType().toString();
- DataType dataType = MySqlTypeUtils.toDataType(columnType);
- boolean notNull = column.toString().toUpperCase().contains("NOT NULL");
- dataType = notNull ? dataType.notNull() : dataType.nullable();
- result =
- result.stream()
- .filter(dataField -> !dataField.name().equals(columnName))
- .collect(Collectors.toList());
- result.add(
- new DataField(
- id++,
- caseSensitive ? columnName : columnName.toLowerCase(),
- dataType));
- }
- } else if (sqlAlterTableItem instanceof SQLAlterTableDropColumnItem) {
- // ignore
- } else if (sqlAlterTableItem instanceof MySqlAlterTableModifyColumn) {
- MySqlAlterTableModifyColumn mySqlAlterTableModifyColumn =
- (MySqlAlterTableModifyColumn) sqlAlterTableItem;
- SQLColumnDefinition newColumnDefinition =
- mySqlAlterTableModifyColumn.getNewColumnDefinition();
- String columnName = newColumnDefinition.getColumnName().replace("`", "");
- String columnType = newColumnDefinition.getDataType().toString();
- DataType dataType = MySqlTypeUtils.toDataType(columnType);
- boolean notNull =
- newColumnDefinition.toString().toUpperCase().contains("NOT NULL");
- dataType = notNull ? dataType.notNull() : dataType.nullable();
- result.add(
- new DataField(
- id++,
- caseSensitive ? columnName : columnName.toLowerCase(),
- dataType));
-
- } else if (sqlAlterTableItem instanceof MySqlAlterTableChangeColumn) {
- MySqlAlterTableChangeColumn mySqlAlterTableChangeColumn =
- (MySqlAlterTableChangeColumn) sqlAlterTableItem;
- SQLColumnDefinition newColumnDefinition =
- mySqlAlterTableChangeColumn.getNewColumnDefinition();
- String oldColumnName =
- mySqlAlterTableChangeColumn
- .getColumnName()
- .getSimpleName()
- .replace("`", "");
- String columnName = newColumnDefinition.getColumnName().replace("`", "");
- String columnType = newColumnDefinition.getDataType().toString();
- DataType dataType = MySqlTypeUtils.toDataType(columnType);
- boolean notNull =
- newColumnDefinition.toString().toUpperCase().contains("NOT NULL");
- dataType = notNull ? dataType.notNull() : dataType.nullable();
- result =
- result.stream()
- .filter(dataField -> !dataField.name().equals(oldColumnName))
- .collect(Collectors.toList());
- result.add(
- new DataField(
- id++,
- caseSensitive ? columnName : columnName.toLowerCase(),
- dataType));
- }
- }
- }
-
- return result;
- }
-
- @Override
- public List<CdcRecord> parseRecords() {
- if (isSchemaChange()) {
- return Collections.emptyList();
- }
- Preconditions.checkNotNull(
- root.get(TYPE),
- "CanalJsonEventParser only supports canal-json format,"
- + "please make sure that your topic's format is accurate.");
- List<CdcRecord> records = new ArrayList<>();
- String type = root.get(TYPE).asText();
- if (OP_UPDATE.equals(type)) {
- ArrayNode data = (ArrayNode) root.get(FIELD_DATA);
- ArrayNode old =
- root.get(FIELD_OLD) instanceof NullNode
- ? null
- : (ArrayNode) root.get(FIELD_OLD);
- for (int i = 0; i < data.size(); i++) {
- Map<String, String> after = extractRow(data.get(i));
- if (old != null) {
- Map<String, String> before = extractRow(old.get(i));
- // fields in "old" (before) means the fields are changed
- // fields not in "old" (before) means the fields are not changed
- // so we just copy the not changed fields into before
- for (Map.Entry<String, String> entry : after.entrySet()) {
- if (!before.containsKey(entry.getKey())) {
- before.put(entry.getKey(), entry.getValue());
- }
- }
- before = caseSensitive ? before : keyCaseInsensitive(before);
- records.add(new CdcRecord(RowKind.DELETE, before));
- }
- after = caseSensitive ? after : keyCaseInsensitive(after);
- records.add(new CdcRecord(RowKind.INSERT, after));
- }
- } else if (OP_INSERT.equals(type)) {
- ArrayNode data = (ArrayNode) root.get(FIELD_DATA);
- for (int i = 0; i < data.size(); i++) {
- Map<String, String> after = extractRow(data.get(i));
- after = caseSensitive ? after : keyCaseInsensitive(after);
- records.add(new CdcRecord(RowKind.INSERT, after));
- }
- } else if (OP_DELETE.equals(type)) {
- ArrayNode data = (ArrayNode) root.get(FIELD_DATA);
- for (int i = 0; i < data.size(); i++) {
- Map<String, String> after = extractRow(data.get(i));
- after = caseSensitive ? after : keyCaseInsensitive(after);
- records.add(new CdcRecord(RowKind.DELETE, after));
- }
- }
-
- return records;
- }
-
- private Map<String, String> extractRow(JsonNode recordRow) {
- Map<String, String> mySqlFieldTypes = new HashMap<>();
- Preconditions.checkNotNull(
- root.get(MYSQL_TYPE),
- "CanalJsonEventParser only supports canal-json format,"
- + "please make sure that your topic's format is accurate.");
- JsonNode schema = root.get(MYSQL_TYPE);
- Iterator<String> iterator = schema.fieldNames();
- while (iterator.hasNext()) {
- String fieldName = iterator.next();
- String fieldType = schema.get(fieldName).asText();
- mySqlFieldTypes.put(fieldName, fieldType);
- }
-
- Map<String, Object> jsonMap =
- objectMapper.convertValue(recordRow, new TypeReference<Map<String, Object>>() {});
- if (jsonMap == null) {
- return new HashMap<>();
- }
- Map<String, String> resultMap = new HashMap<>();
- for (Map.Entry<String, String> field : mySqlFieldTypes.entrySet()) {
- String fieldName = field.getKey();
- String mySqlType = field.getValue();
- Object objectValue = jsonMap.get(fieldName);
- if (objectValue == null) {
- continue;
- }
-
- String oldValue = objectValue.toString();
- String newValue = oldValue;
- if (MySqlTypeUtils.isSetType(MySqlTypeUtils.getShortType(mySqlType))) {
- newValue = CanalFieldParser.convertSet(newValue, mySqlType);
- } else if (MySqlTypeUtils.isEnumType(MySqlTypeUtils.getShortType(mySqlType))) {
- newValue = CanalFieldParser.convertEnum(newValue, mySqlType);
- } else if (MySqlTypeUtils.isGeoType(MySqlTypeUtils.getShortType(mySqlType))) {
- try {
- byte[] wkb =
- CanalFieldParser.convertGeoType2WkbArray(
- oldValue.getBytes(StandardCharsets.ISO_8859_1));
- newValue = MySqlTypeUtils.convertWkbArray(wkb);
- } catch (Exception e) {
- throw new IllegalArgumentException(
- String.format("Failed to convert %s to geometry JSON.", oldValue), e);
- }
- }
- resultMap.put(fieldName, newValue);
- }
- // generate values of computed columns
- for (ComputedColumn computedColumn : computedColumns) {
- resultMap.put(
- computedColumn.columnName(),
- computedColumn.eval(resultMap.get(computedColumn.fieldReference())));
- }
-
- return resultMap;
- }
-
- private Map<String, String> keyCaseInsensitive(Map<String, String> origin) {
- Map<String, String> keyCaseInsensitive = new HashMap<>();
- for (Map.Entry<String, String> entry : origin.entrySet()) {
- String fieldName = entry.getKey().toLowerCase();
- checkArgument(
- !keyCaseInsensitive.containsKey(fieldName),
- "Duplicate key appears when converting map keys to case-insensitive form. Original map is:\n%s",
- origin);
- keyCaseInsensitive.put(fieldName, entry.getValue());
- }
- return keyCaseInsensitive;
- }
-}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/canal/CanalRecordParser.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/canal/CanalRecordParser.java
new file mode 100644
index 000000000..6812aaa8a
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/canal/CanalRecordParser.java
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.kafka.canal;
+
+import org.apache.paimon.flink.action.cdc.ComputedColumn;
+import org.apache.paimon.flink.action.cdc.TableNameConverter;
+import org.apache.paimon.flink.action.cdc.mysql.MySqlTypeUtils;
+import org.apache.paimon.flink.sink.cdc.CdcRecord;
+import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.utils.StringUtils;
+
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.NullNode;
+
+import com.alibaba.druid.DbType;
+import com.alibaba.druid.sql.SQLUtils;
+import com.alibaba.druid.sql.ast.SQLStatement;
+import com.alibaba.druid.sql.ast.statement.SQLAlterTableAddColumn;
+import com.alibaba.druid.sql.ast.statement.SQLAlterTableDropColumnItem;
+import com.alibaba.druid.sql.ast.statement.SQLAlterTableItem;
+import com.alibaba.druid.sql.ast.statement.SQLAlterTableStatement;
+import com.alibaba.druid.sql.ast.statement.SQLColumnDefinition;
+import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlAlterTableChangeColumn;
+import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlAlterTableModifyColumn;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.util.Collector;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+
+/** Convert canal-json format string to list of {@link RichCdcMultiplexRecord}s. */
+public class CanalRecordParser implements FlatMapFunction<String, RichCdcMultiplexRecord> {
+
+ private static final String FIELD_DATABASE = "database";
+ private static final String FIELD_TABLE = "table";
+ private static final String FIELD_SQL = "sql";
+ private static final String FIELD_MYSQL_TYPE = "mysqlType";
+ private static final String FIELD_TYPE = "type";
+ private static final String FIELD_DATA = "data";
+ private static final String FIELD_OLD = "old";
+ private static final String OP_UPDATE = "UPDATE";
+ private static final String OP_INSERT = "INSERT";
+ private static final String OP_DELETE = "DELETE";
+
+ private final ObjectMapper objectMapper = new ObjectMapper();
+
+ private final boolean caseSensitive;
+ private final TableNameConverter tableNameConverter;
+ private final List<ComputedColumn> computedColumns;
+
+ private JsonNode root;
+ private String databaseName;
+ private String tableName;
+
+ public CanalRecordParser(boolean caseSensitive, List<ComputedColumn> computedColumns) {
+ this(caseSensitive, new TableNameConverter(caseSensitive), computedColumns);
+ }
+
+ public CanalRecordParser(boolean caseSensitive, TableNameConverter tableNameConverter) {
+ this(caseSensitive, tableNameConverter, Collections.emptyList());
+ }
+
+ public CanalRecordParser(
+ boolean caseSensitive,
+ TableNameConverter tableNameConverter,
+ List<ComputedColumn> computedColumns) {
+ this.caseSensitive = caseSensitive;
+ this.tableNameConverter = tableNameConverter;
+ this.computedColumns = computedColumns;
+ }
+
+ @Override
+ public void flatMap(String value, Collector<RichCdcMultiplexRecord> out) throws Exception {
+ root = objectMapper.readValue(value, JsonNode.class);
+ validateFormat();
+
+ databaseName = root.get(FIELD_DATABASE).asText();
+ tableName = tableNameConverter.convert(root.get(FIELD_TABLE).asText());
+
+ extractRecords().forEach(out::collect);
+ }
+
+ private void validateFormat() {
+ String errorMessageTemplate =
+ "Didn't find '%s' node in json. Only supports canal-json format,"
+ + "please make sure your topic's format is correct.";
+
+ checkNotNull(root.get(FIELD_DATABASE), errorMessageTemplate, FIELD_DATABASE);
+ checkNotNull(root.get(FIELD_TABLE), errorMessageTemplate, FIELD_TABLE);
+ checkNotNull(root.get(FIELD_TYPE), errorMessageTemplate, FIELD_TYPE);
+ checkNotNull(root.get(FIELD_DATA), errorMessageTemplate, FIELD_DATA);
+
+ if (isDdl()) {
+ checkNotNull(root.get(FIELD_SQL), errorMessageTemplate, FIELD_SQL);
+ } else {
+ checkNotNull(root.get(FIELD_MYSQL_TYPE), errorMessageTemplate, FIELD_MYSQL_TYPE);
+ }
+ }
+
+ private boolean isDdl() {
+ return root.get("isDdl") != null && root.get("isDdl").asBoolean();
+ }
+
+ private List<RichCdcMultiplexRecord> extractRecords() {
+ if (isDdl()) {
+ return extractRecordsFromDdl();
+ }
+
+ // extract field types
+ LinkedHashMap<String, String> mySqlFieldTypes = extractFieldTypesFromMySqlType();
+ LinkedHashMap<String, DataType> paimonFieldTypes = new LinkedHashMap<>();
+ mySqlFieldTypes.forEach(
+ (name, type) -> paimonFieldTypes.put(name, toPaimonDataType(type, true)));
+
+ // extract row kind and field values
+ List<RichCdcMultiplexRecord> records = new ArrayList<>();
+ String type = root.get(FIELD_TYPE).asText();
+ ArrayNode data = (ArrayNode) root.get(FIELD_DATA);
+ switch (type) {
+ case OP_UPDATE:
+ ArrayNode old =
+ root.get(FIELD_OLD) instanceof NullNode
+ ? null
+ : (ArrayNode) root.get(FIELD_OLD);
+ for (int i = 0; i < data.size(); i++) {
+ Map<String, String> after = extractRow(data.get(i), mySqlFieldTypes);
+ if (old != null) {
+ Map<String, String> before = extractRow(old.get(i), mySqlFieldTypes);
+ // fields in "old" (before) means the fields are changed
+ // fields not in "old" (before) means the fields are not changed,
+ // so we just copy the not changed fields into before
+ for (Map.Entry<String, String> entry : after.entrySet()) {
+ if (!before.containsKey(entry.getKey())) {
+ before.put(entry.getKey(), entry.getValue());
+ }
+ }
+ before = caseSensitive ? before : keyCaseInsensitive(before);
+ records.add(
+ new RichCdcMultiplexRecord(
+ new CdcRecord(RowKind.DELETE, before),
+ paimonFieldTypes,
+ databaseName,
+ tableName));
+ }
+ after = caseSensitive ? after : keyCaseInsensitive(after);
+ records.add(
+ new RichCdcMultiplexRecord(
+ new CdcRecord(RowKind.INSERT, after),
+ paimonFieldTypes,
+ databaseName,
+ tableName));
+ }
+ break;
+ case OP_INSERT:
+ // fall through
+ case OP_DELETE:
+ for (JsonNode datum : data) {
+ Map<String, String> after = extractRow(datum, mySqlFieldTypes);
+ after = caseSensitive ? after : keyCaseInsensitive(after);
+ RowKind kind = type.equals(OP_INSERT) ? RowKind.INSERT : RowKind.DELETE;
+ records.add(
+ new RichCdcMultiplexRecord(
+ new CdcRecord(kind, after),
+ paimonFieldTypes,
+ databaseName,
+ tableName));
+ }
+ break;
+ default:
+ throw new UnsupportedOperationException("Unknown record type: " + type);
+ }
+
+ return records;
+ }
+
+ private List<RichCdcMultiplexRecord> extractRecordsFromDdl() {
+ String sql = root.get(FIELD_SQL).asText();
+ if (StringUtils.isEmpty(sql)) {
+ return Collections.emptyList();
+ }
+
+ LinkedHashMap<String, DataType> fieldTypes = new LinkedHashMap<>();
+ SQLStatement sqlStatement = SQLUtils.parseSingleStatement(sql, DbType.mysql);
+
+ if (sqlStatement instanceof SQLAlterTableStatement) {
+ SQLAlterTableStatement sqlAlterTableStatement = (SQLAlterTableStatement) sqlStatement;
+ for (SQLAlterTableItem sqlAlterTableItem : sqlAlterTableStatement.getItems()) {
+ extractFieldTypesFromAlterTableItem(sqlAlterTableItem, fieldTypes);
+ }
+ }
+
+ return Collections.singletonList(
+ new RichCdcMultiplexRecord(
+ CdcRecord.emptyRecord(), fieldTypes, databaseName, tableName));
+ }
+
+ private void extractFieldTypesFromAlterTableItem(
+ SQLAlterTableItem sqlAlterTableItem, LinkedHashMap<String, DataType> fieldTypes) {
+ if (sqlAlterTableItem instanceof SQLAlterTableAddColumn) {
+ SQLAlterTableAddColumn sqlAlterTableAddColumn =
+ (SQLAlterTableAddColumn) sqlAlterTableItem;
+ List<SQLColumnDefinition> columns = sqlAlterTableAddColumn.getColumns();
+
+ for (SQLColumnDefinition column : columns) {
+ fieldTypes.put(getColumnName(column), getPaimonDataType(column));
+ }
+ } else if (sqlAlterTableItem instanceof SQLAlterTableDropColumnItem) {
+ // ignore
+ } else if (sqlAlterTableItem instanceof MySqlAlterTableModifyColumn) {
+ MySqlAlterTableModifyColumn mySqlAlterTableModifyColumn =
+ (MySqlAlterTableModifyColumn) sqlAlterTableItem;
+ SQLColumnDefinition newColumnDefinition =
+ mySqlAlterTableModifyColumn.getNewColumnDefinition();
+
+ fieldTypes.put(
+ getColumnName(newColumnDefinition), getPaimonDataType(newColumnDefinition));
+ } else if (sqlAlterTableItem instanceof MySqlAlterTableChangeColumn) {
+ MySqlAlterTableChangeColumn mySqlAlterTableChangeColumn =
+ (MySqlAlterTableChangeColumn) sqlAlterTableItem;
+ SQLColumnDefinition newColumnDefinition =
+ mySqlAlterTableChangeColumn.getNewColumnDefinition();
+
+ fieldTypes.put(
+ getColumnName(newColumnDefinition), getPaimonDataType(newColumnDefinition));
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported ALTER TABLE type: "
+ + sqlAlterTableItem.getClass().getSimpleName());
+ }
+ }
+
+ private String getColumnName(SQLColumnDefinition column) {
+ return toFieldName(StringUtils.replace(column.getColumnName(), "`", ""));
+ }
+
+ private DataType getPaimonDataType(SQLColumnDefinition column) {
+ return toPaimonDataType(
+ column.getDataType().toString(), !column.containsNotNullConstaint());
+ }
+
+ private String toFieldName(String rawName) {
+ return StringUtils.caseSensitiveConversion(rawName, caseSensitive);
+ }
+
+ private DataType toPaimonDataType(String mySqlType, boolean isNullable) {
+ return MySqlTypeUtils.toDataType(mySqlType).copy(isNullable);
+ }
+
+ private LinkedHashMap<String, String> extractFieldTypesFromMySqlType() {
+ LinkedHashMap<String, String> fieldTypes = new LinkedHashMap<>();
+
+ JsonNode schema = root.get(FIELD_MYSQL_TYPE);
+ Iterator<String> iterator = schema.fieldNames();
+ while (iterator.hasNext()) {
+ String fieldName = iterator.next();
+ String fieldType = schema.get(fieldName).asText();
+ fieldTypes.put(toFieldName(fieldName), fieldType);
+ }
+
+ return fieldTypes;
+ }
+
+ private Map<String, String> extractRow(JsonNode record, Map<String, String> mySqlFieldTypes) {
+ Map<String, Object> jsonMap =
+ objectMapper.convertValue(record, new TypeReference<Map<String, Object>>() {});
+ if (jsonMap == null) {
+ return new HashMap<>();
+ }
+
+ Map<String, String> resultMap = new HashMap<>();
+ for (Map.Entry<String, String> field : mySqlFieldTypes.entrySet()) {
+ String fieldName = field.getKey();
+ String mySqlType = field.getValue();
+ Object objectValue = jsonMap.get(fieldName);
+ if (objectValue == null) {
+ continue;
+ }
+
+ String oldValue = objectValue.toString();
+ String newValue = oldValue;
+
+ if (MySqlTypeUtils.isSetType(MySqlTypeUtils.getShortType(mySqlType))) {
+ newValue = CanalFieldParser.convertSet(newValue, mySqlType);
+ } else if (MySqlTypeUtils.isEnumType(MySqlTypeUtils.getShortType(mySqlType))) {
+ newValue = CanalFieldParser.convertEnum(newValue, mySqlType);
+ } else if (MySqlTypeUtils.isGeoType(MySqlTypeUtils.getShortType(mySqlType))) {
+ try {
+ byte[] wkb =
+ CanalFieldParser.convertGeoType2WkbArray(
+ oldValue.getBytes(StandardCharsets.ISO_8859_1));
+ newValue = MySqlTypeUtils.convertWkbArray(wkb);
+ } catch (Exception e) {
+ throw new IllegalArgumentException(
+ String.format("Failed to convert %s to geometry JSON.", oldValue), e);
+ }
+ }
+ resultMap.put(fieldName, newValue);
+ }
+
+ // generate values for computed columns
+ for (ComputedColumn computedColumn : computedColumns) {
+ resultMap.put(
+ computedColumn.columnName(),
+ computedColumn.eval(resultMap.get(computedColumn.fieldReference())));
+ }
+
+ return resultMap;
+ }
+
+ private Map<String, String> keyCaseInsensitive(Map<String, String> origin) {
+ Map<String, String> keyCaseInsensitive = new HashMap<>();
+ for (Map.Entry<String, String> entry : origin.entrySet()) {
+ String fieldName = entry.getKey().toLowerCase();
+ checkArgument(
+ !keyCaseInsensitive.containsKey(fieldName),
+ "Duplicate key appears when converting map keys to case-insensitive form. Original map is:\n%s",
+ origin);
+ keyCaseInsensitive.put(fieldName, entry.getValue());
+ }
+ return keyCaseInsensitive;
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
index bba93eaa2..f8ec80618 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
@@ -122,12 +122,6 @@ public class MySqlDebeziumJsonEventParser implements EventParser<String> {
return tableNameConverter.convert(tableName);
}
- @Override
- public String parseDatabaseName() {
- String databaseName = payload.get("source").get("db").asText();
- return tableNameConverter.convert(databaseName);
- }
-
private boolean isSchemaChange() {
return payload.get("op") == null;
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
index c4b7ea7a9..363c5490e 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
@@ -22,6 +22,7 @@ import org.apache.paimon.annotation.Experimental;
import org.apache.paimon.types.RowKind;
import java.io.Serializable;
+import java.util.Collections;
import java.util.Map;
import java.util.Objects;
@@ -39,6 +40,10 @@ public class CdcRecord implements Serializable {
this.fields = fields;
}
+ public static CdcRecord emptyRecord() {
+ return new CdcRecord(RowKind.INSERT, Collections.emptyMap());
+ }
+
public RowKind kind() {
return kind;
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java
index 136ff32b7..e62864ee9 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java
@@ -42,10 +42,6 @@ public interface EventParser<T> {
throw new UnsupportedOperationException("Table name is not supported in this parser.");
}
- default String parseDatabaseName() {
- throw new UnsupportedOperationException("Database name is not supported in this parser.");
- }
-
/**
* Parse new schema if this event contains schema change.
*
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecord.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecord.java
new file mode 100644
index 000000000..c8d09f4d0
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecord.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.types.DataType;
+
+import java.io.Serializable;
+import java.util.LinkedHashMap;
+import java.util.Objects;
+
+/** Compared to {@link CdcMultiplexRecord}, this contains schema information. */
+public class RichCdcMultiplexRecord implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final CdcRecord cdcRecord;
+ private final LinkedHashMap<String, DataType> fieldTypes;
+ private final String databaseName;
+ private final String tableName;
+
+ public RichCdcMultiplexRecord(
+ CdcRecord cdcRecord,
+ LinkedHashMap<String, DataType> fieldTypes,
+ String databaseName,
+ String tableName) {
+ this.cdcRecord = cdcRecord;
+ this.fieldTypes = fieldTypes;
+ this.databaseName = databaseName;
+ this.tableName = tableName;
+ }
+
+ public String tableName() {
+ return tableName;
+ }
+
+ public RichCdcRecord toRichCdcRecord() {
+ return new RichCdcRecord(cdcRecord, fieldTypes);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(cdcRecord, fieldTypes, databaseName, tableName);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ RichCdcMultiplexRecord that = (RichCdcMultiplexRecord) o;
+ return Objects.equals(cdcRecord, that.cdcRecord)
+ && Objects.equals(fieldTypes, that.fieldTypes)
+ && databaseName.equals(that.databaseName)
+ && tableName.equals(that.tableName);
+ }
+
+ @Override
+ public String toString() {
+ return "{"
+ + "cdcRecord="
+ + cdcRecord
+ + ", fieldTypes="
+ + fieldTypes
+ + ", databaseName="
+ + databaseName
+ + ", tableName="
+ + tableName
+ + '}';
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
similarity index 58%
copy from paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
copy to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
index 7288138bc..105f45be4 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
@@ -18,46 +18,40 @@
package org.apache.paimon.flink.sink.cdc;
-import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.DataField;
-import org.apache.paimon.utils.ObjectUtils;
-import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
-import java.util.Optional;
+import java.util.Map;
-/** Testing {@link EventParser} for {@link TestCdcEvent}. */
-public class TestCdcEventParser implements EventParser<TestCdcEvent> {
+/** {@link EventParser} for {@link RichCdcMultiplexRecord}. */
+public class RichCdcMultiplexRecordEventParser implements EventParser<RichCdcMultiplexRecord> {
- private TestCdcEvent raw;
+ // TODO: currently we don't consider database
+ private String currentTable;
+ private RichEventParser currentParser;
+
+ private final Map<String, RichEventParser> parsers = new HashMap<>();
@Override
- public void setRawEvent(TestCdcEvent raw) {
- this.raw = raw;
+ public void setRawEvent(RichCdcMultiplexRecord record) {
+ this.currentTable = record.tableName();
+ this.currentParser = parsers.computeIfAbsent(currentTable, t -> new RichEventParser());
+ currentParser.setRawEvent(record.toRichCdcRecord());
}
@Override
public String parseTableName() {
- return raw.tableName();
+ return currentTable;
}
@Override
public List<DataField> parseSchemaChange() {
- return ObjectUtils.coalesce(raw.updatedDataFields(), Collections.emptyList());
- }
-
- @Override
- public String parseDatabaseName() {
- return null;
+ return currentParser.parseSchemaChange();
}
@Override
public List<CdcRecord> parseRecords() {
- return ObjectUtils.coalesce(raw.records(), Collections.emptyList());
- }
-
- @Override
- public Optional<Schema> parseNewTable(String databaseName) {
- return Optional.empty();
+ return currentParser.parseRecords();
}
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcParserFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcParserFactory.java
deleted file mode 100644
index 3cc9cae08..000000000
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcParserFactory.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.sink.cdc;
-
-import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.DataType;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-
-/** A {@link EventParser.Factory} for {@link RichCdcRecord}. */
-public class RichCdcParserFactory implements EventParser.Factory<RichCdcRecord> {
-
- @Override
- public RichEventParser create() {
- return new RichEventParser();
- }
-
- private static class RichEventParser implements EventParser<RichCdcRecord> {
-
- private RichCdcRecord record;
-
- private final Map<String, DataType> previousDataFields = new HashMap<>();
-
- @Override
- public void setRawEvent(RichCdcRecord rawEvent) {
- this.record = rawEvent;
- }
-
- @Override
- public List<DataField> parseSchemaChange() {
- List<DataField> change = new ArrayList<>();
- record.fieldTypes()
- .forEach(
- (field, type) -> {
- DataType previous = previousDataFields.get(field);
- if (!Objects.equals(previous, type)) {
- previousDataFields.put(field, type);
- change.add(new DataField(0, field, type));
- }
- });
- return change;
- }
-
- @Override
- public List<CdcRecord> parseRecords() {
- return Collections.singletonList(record.toCdcRecord());
- }
- }
-}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcRecord.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcRecord.java
index 7d02c8a60..37e9271d0 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcRecord.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcRecord.java
@@ -24,6 +24,7 @@ import org.apache.paimon.types.RowKind;
import java.io.Serializable;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
@@ -33,31 +34,28 @@ public class RichCdcRecord implements Serializable {
private static final long serialVersionUID = 1L;
- private final RowKind kind;
- private final Map<String, DataType> fieldTypes;
- private final Map<String, String> fieldValues;
+ private final CdcRecord cdcRecord;
+ private final LinkedHashMap<String, DataType> fieldTypes;
- public RichCdcRecord(
- RowKind kind, Map<String, DataType> fieldTypes, Map<String, String> fieldValues) {
- this.kind = kind;
+ public RichCdcRecord(CdcRecord cdcRecord, LinkedHashMap<String, DataType> fieldTypes) {
+ this.cdcRecord = cdcRecord;
this.fieldTypes = fieldTypes;
- this.fieldValues = fieldValues;
}
- public RowKind kind() {
- return kind;
+ public boolean hasPayload() {
+ return !cdcRecord.fields().isEmpty();
}
- public Map<String, DataType> fieldTypes() {
- return fieldTypes;
+ public RowKind kind() {
+ return cdcRecord.kind();
}
- public Map<String, String> fieldValues() {
- return fieldValues;
+ public LinkedHashMap<String, DataType> fieldTypes() {
+ return fieldTypes;
}
public CdcRecord toCdcRecord() {
- return new CdcRecord(kind, fieldValues);
+ return cdcRecord;
}
@Override
@@ -69,26 +67,17 @@ public class RichCdcRecord implements Serializable {
return false;
}
RichCdcRecord that = (RichCdcRecord) o;
- return kind == that.kind
- && Objects.equals(fieldTypes, that.fieldTypes)
- && Objects.equals(fieldValues, that.fieldValues);
+ return cdcRecord == that.cdcRecord && Objects.equals(fieldTypes, that.fieldTypes);
}
@Override
public int hashCode() {
- return Objects.hash(kind, fieldTypes, fieldValues);
+ return Objects.hash(cdcRecord, fieldTypes);
}
@Override
public String toString() {
- return "{"
- + "kind="
- + kind
- + ", fieldTypes="
- + fieldTypes
- + ", fieldValues="
- + fieldValues
- + '}';
+ return "{" + "cdcRecord=" + cdcRecord + ", fieldTypes=" + fieldTypes + '}';
}
public static Builder builder(RowKind kind) {
@@ -99,7 +88,7 @@ public class RichCdcRecord implements Serializable {
public static class Builder {
private final RowKind kind;
- private final Map<String, DataType> fieldTypes = new HashMap<>();
+ private final LinkedHashMap<String, DataType> fieldTypes = new LinkedHashMap<>();
private final Map<String, String> fieldValues = new HashMap<>();
public Builder(RowKind kind) {
@@ -113,7 +102,7 @@ public class RichCdcRecord implements Serializable {
}
public RichCdcRecord build() {
- return new RichCdcRecord(kind, fieldTypes, fieldValues);
+ return new RichCdcRecord(new CdcRecord(kind, fieldValues), fieldTypes);
}
}
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java
index 5648b695a..774895757 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcSinkBuilder.java
@@ -68,7 +68,7 @@ public class RichCdcSinkBuilder {
CdcSinkBuilder<RichCdcRecord> builder = new CdcSinkBuilder<>();
return builder.withTable(table)
.withInput(input)
- .withParserFactory(new RichCdcParserFactory())
+ .withParserFactory(RichEventParser::new)
.withParallelism(parallelism)
.withIdentifier(identifier)
.withCatalogLoader(catalogLoader)
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichEventParser.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichEventParser.java
new file mode 100644
index 000000000..c98ead49f
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichEventParser.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Objects;
+
+/** A {@link EventParser} for {@link RichCdcRecord}. */
+public class RichEventParser implements EventParser<RichCdcRecord> {
+
+ private RichCdcRecord record;
+
+ private final LinkedHashMap<String, DataType> previousDataFields = new LinkedHashMap<>();
+
+ @Override
+ public void setRawEvent(RichCdcRecord rawEvent) {
+ this.record = rawEvent;
+ }
+
+ @Override
+ public List<DataField> parseSchemaChange() {
+ List<DataField> change = new ArrayList<>();
+ record.fieldTypes()
+ .forEach(
+ (field, type) -> {
+ DataType previous = previousDataFields.get(field);
+ if (!Objects.equals(previous, type)) {
+ previousDataFields.put(field, type);
+ change.add(new DataField(0, field, type));
+ }
+ });
+ return change;
+ }
+
+ @Override
+ public List<CdcRecord> parseRecords() {
+ if (record.hasPayload()) {
+ return Collections.singletonList(record.toCdcRecord());
+ } else {
+ return Collections.emptyList();
+ }
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
index fd7569acf..1472021b0 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
@@ -57,10 +57,21 @@ public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
@Test
@Timeout(60)
public void testSchemaEvolution() throws Exception {
+ runSingleTableSchemaEvolution("schemaevolution");
+ }
+
+ @Test
+ @Timeout(60)
+ public void testSchemaEvolutionWithMissingDdl() throws Exception {
+ runSingleTableSchemaEvolution("schemaevolutionmissingddl");
+ }
+
+ private void runSingleTableSchemaEvolution(String sourceDir) throws Exception {
final String topic = "schema_evolution";
createTestTopic(topic, 1, 1);
// ---------- Write the Canal json into Kafka -------------------
- List<String> lines = readLines("kafka.canal/table/schemaevolution/canal-data-1.txt");
+ List<String> lines =
+ readLines(String.format("kafka.canal/table/%s/canal-data-1.txt", sourceDir));
try {
writeRecordsToKafka(topic, lines);
} catch (Exception e) {
@@ -95,10 +106,10 @@ public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
waitJobRunning(client);
- testSchemaEvolutionImpl(topic);
+ testSchemaEvolutionImpl(topic, sourceDir);
}
- private void testSchemaEvolutionImpl(String topic) throws Exception {
+ private void testSchemaEvolutionImpl(String topic, String sourceDir) throws Exception {
FileStoreTable table = getFileStoreTable();
RowType rowType =
@@ -115,7 +126,8 @@ public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
try {
writeRecordsToKafka(
- topic, readLines("kafka.canal/table/schemaevolution/canal-data-2.txt"));
+ topic,
+ readLines(String.format("kafka.canal/table/%s/canal-data-2.txt", sourceDir)));
} catch (Exception e) {
throw new Exception("Failed to write canal data to Kafka.", e);
}
@@ -140,7 +152,8 @@ public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
try {
writeRecordsToKafka(
- topic, readLines("kafka.canal/table/schemaevolution/canal-data-3.txt"));
+ topic,
+ readLines(String.format("kafka.canal/table/%s/canal-data-3.txt", sourceDir)));
} catch (Exception e) {
throw new Exception("Failed to write canal data to Kafka.", e);
}
@@ -166,7 +179,8 @@ public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
try {
writeRecordsToKafka(
- topic, readLines("kafka.canal/table/schemaevolution/canal-data-4.txt"));
+ topic,
+ readLines(String.format("kafka.canal/table/%s/canal-data-4.txt", sourceDir)));
} catch (Exception e) {
throw new Exception("Failed to write canal data to Kafka.", e);
}
@@ -196,7 +210,8 @@ public class KafkaCanalSyncTableActionITCase extends KafkaActionITCaseBase {
try {
writeRecordsToKafka(
- topic, readLines("kafka.canal/table/schemaevolution/canal-data-5.txt"));
+ topic,
+ readLines(String.format("kafka.canal/table/%s/canal-data-5.txt", sourceDir)));
} catch (Exception e) {
throw new Exception("Failed to write canal data to Kafka.", e);
}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaEventParserTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaEventParserTest.java
deleted file mode 100644
index 168f92641..000000000
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaEventParserTest.java
+++ /dev/null
@@ -1,279 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.action.cdc.kafka;
-
-import org.apache.paimon.data.BinaryString;
-import org.apache.paimon.data.GenericArray;
-import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.flink.action.cdc.ComputedColumn;
-import org.apache.paimon.flink.action.cdc.TableNameConverter;
-import org.apache.paimon.flink.action.cdc.kafka.canal.CanalJsonEventParser;
-import org.apache.paimon.flink.action.cdc.mysql.Expression;
-import org.apache.paimon.flink.sink.cdc.EventParser;
-import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.DataTypes;
-
-import org.junit.jupiter.api.Test;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
-import static org.apache.paimon.flink.sink.cdc.CdcRecordUtils.toGenericRow;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-
-/** Tests for {@link CanalJsonEventParser}. */
-public class KafkaEventParserTest {
- private static final String CANAL_JSON_EVENT =
- "{\"data\":[{\"pt\":\"1\",\"_ID\":\"1\",\"v1\":\"one\","
- + "\"_geometrycollection\":\"\\u0000\\u0000\\u0000\\u0000\\u0001\\u0007\\u0000\\u0000\\u0000\\u0003"
- + "\\u0000\\u0000\\u0000\\u0001\\u0001\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000"
- + "\\u0000$@\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000$@\\u0001\\u0001\\u0000\\u0000\\u0000\\u0000"
- + "\\u0000\\u0000\\u0000\\u0000\\u0000>@\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000>@\\u0001\\u0002"
- + "\\u0000\\u0000\\u0000\\u0002\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000"
- + ".@\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000"
- + ".@\\u0000\\u0000\\u0000\\u0000\\u0000\\u00004@\\u0000\\u0000\\u0000\\u0000\\u0000\\u00004@\","
- + "\"_set\":\"3\",\"_enum\":\"1\"}],\"database\":\"paimon_sync_table\",\"es\":1683006706000,"
- + "\"id\":92,\"isDdl\":false,\"mysqlType\":{\"pt\":\"INT\",\"_ID\":\"INT\",\"v1\":\"VARCHAR(10)\","
- + "\"_geometrycollection\":\"GEOMETRYCOLLECTION\",\"_set\":\"SET('a','b','c','d')\",\"_enum\":\"ENUM"
- + "('value1','value2','value3')\"},\"old\":null,\"pkNames\":[\"_id\"],\"sql\":\"\","
- + "\"sqlType\":{\"pt\":4,\"_id\":4,\"v1\":12},\"table\":\"schema_evolution_1\",\"ts\":1683006706728,"
- + "\"type\":\"INSERT\"}\n"
- + ":null,\"database\":\"paimon_sync_table\",\"es\":1683168078000,\"id\":40,\"isDdl\":true,"
- + "\"mysqlType\":null,\"old\":null,\"pkNames\":null,\"sql\":\"/* Query from "
- + "DMS-WEBSQL-0-Qid_18293315143325386Y by user 1486767996652600 */ ALTER TABLE schema_evolution_1 "
- + "MODIFY COLUMN v2 BIGINT\",\"sqlType\":null,\"table\":\"schema_evolution_1\",\"ts\":1683168078956,"
- + "\"type\":\"ALTER\"}";
- private static final String CANAL_JSON_DDL_ADD_EVENT =
- "{\"data\":null,\"database\":\"paimon_sync_table\","
- + "\"es\":1683008289000,\"id\":13,\"isDdl\":true,\"mysqlType\":null,\"old\":null,\"pkNames\":null,"
- + "\"sql\":\" ALTER TABLE "
- + "schema_evolution_1 ADD COLUMN v2 INT\",\"sqlType\":null,\"table\":\"schema_evolution_1\","
- + "\"ts\":1683008289401,\"type\":\"ALTER\"}";
-
- private static final String CANAL_JSON_DDL_MODIFY_EVENT =
- "{\"data\":null,\"database\":\"paimon_sync_table\",\"es\":1683168155000,\"id\":54,\"isDdl\":true,"
- + "\"mysqlType\":null,\"old\":null,\"pkNames\":null,\"sql\":\" ALTER TABLE schema_evolution_1 MODIFY "
- + "COLUMN v1 VARCHAR(20)\",\"sqlType\":null,\"table\":\"schema_evolution_1\",\"ts\":1683168154943,"
- + "\"type\":\"ALTER\"}";
-
- private static final String CANAL_JSON_DDL_DROP_EVENT =
- "{\"data\":null,\"database\":\"paimon_sync_table\",\"es\":1683168155000,\"id\":54,\"isDdl\":true,"
- + "\"mysqlType\":null,\"old\":null,\"pkNames\":null,\"sql\":\" ALTER TABLE schema_evolution_1 DROP "
- + "COLUMN v1 \",\"sqlType\":null,\"table\":\"schema_evolution_1\",\"ts\":1683168154943,"
- + "\"type\":\"ALTER\"}";
-
- private static final String CANAL_JSON_DDL_CHANGE_EVENT =
- "{\"data\":null,\"database\":\"paimon_sync_table\",\"es\":1683168155000,\"id\":54,\"isDdl\":true,"
- + "\"mysqlType\":null,\"old\":null,\"pkNames\":null,\"sql\":\" ALTER TABLE schema_evolution_1 CHANGE "
- + "COLUMN `$% ^,& *(` cg VARCHAR(20) \",\"sqlType\":null,\"table\":\"schema_evolution_1\",\"ts\":1683168154943,"
- + "\"type\":\"ALTER\"}";
-
- private static final String CANAL_JSON_DDL_MULTI_ADD_EVENT =
- "{\"data\":null,\"database\":\"paimon_sync_table\","
- + "\"es\":1683614798000,\"id\":2431,\"isDdl\":true,\"mysqlType\":null,\"old\":null,\"pkNames\":null,"
- + "\"sql\":\" ALTER TABLE "
- + "schema_evolution_multiple \\nADD v4 INT,\\nMODIFY COLUMN v1 VARCHAR(30),\\nADD COLUMN (v5 DOUBLE, v6 "
- + "DECIMAL(5, 3), `$% ^,& *(` VARCHAR(10) COMMENT 'test'),\\n MODIFY v2 BIGINT\",\"sqlType\":null,"
- + "\"table\":\"schema_evolution_multiple\",\"ts\":1683614799031,\"type\":\"ALTER\"}\n";
-
- private static final String DEBEZIUM_JSON_EVENT =
- "{\"before\":null,\"after\":{\"id\":101,\"name\":\"scooter\","
- + "\"description\":\"Small 2-wheel scooter\",\"weight\":3.140000104904175},\"source\":{\"version\":\"1.1.1"
- + ".Final\",\"connector\":\"mysql\",\"name\":\"dbserver1\",\"ts_ms\":0,\"snapshot\":\"true\","
- + "\"db\":\"inventory\",\"table\":\"products\",\"server_id\":0,\"gtid\":null,\"file\":\"mysql-bin.000003\","
- + "\"pos\":154,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"c\",\"ts_ms\":1589355606100,"
- + "\"transaction\":null}\n";
-
- @Test
- public void testCanalJsonEventParser() {
- boolean caseSensitive = false;
- EventParser<String> parser =
- new CanalJsonEventParser(caseSensitive, new TableNameConverter(caseSensitive));
- parser.setRawEvent(CANAL_JSON_EVENT);
- List<DataField> dataFields = new ArrayList<>();
- dataFields.add(new DataField(0, "pt", DataTypes.INT()));
- dataFields.add(new DataField(1, "_id", DataTypes.INT()));
- dataFields.add(new DataField(2, "v1", DataTypes.VARCHAR(10)));
- dataFields.add(new DataField(3, "_geometrycollection", DataTypes.STRING()));
- dataFields.add(new DataField(4, "_set", DataTypes.ARRAY(DataTypes.STRING())));
- dataFields.add(new DataField(5, "_enum", DataTypes.STRING()));
- assertThat(parser.parseSchemaChange()).isEmpty();
- List<GenericRow> result =
- parser.parseRecords().stream()
- .map(record -> toGenericRow(record, dataFields).get())
- .collect(Collectors.toList());
- BinaryString[] binaryStrings =
- new BinaryString[] {BinaryString.fromString("a"), BinaryString.fromString("b")};
- GenericArray genericArray = new GenericArray(binaryStrings);
-
- List<GenericRow> expect =
- Collections.singletonList(
- GenericRow.of(
- 1,
- 1,
- BinaryString.fromString("one"),
- BinaryString.fromString(
- "{\"geometries\":[{\"type\":\"Point\",\"coordinates\":[10,10]},{\"type\":\"Point\",\"coordinates\":[30,30]},{\"type\":\"LineString\",\"coordinates\":[[15,15],[20,20]]}],\"type\":\"GeometryCollection\",\"srid\":0}"),
- genericArray,
- BinaryString.fromString("value1")));
- assertThat(result).isEqualTo(expect);
- }
-
- @Test
- public void testCanalJsonEventParserDdl() {
- boolean caseSensitive = false;
- EventParser<String> parser =
- new CanalJsonEventParser(caseSensitive, new TableNameConverter(caseSensitive));
- parser.setRawEvent(CANAL_JSON_EVENT);
- List<DataField> expectDataFields = new ArrayList<>();
- expectDataFields.add(new DataField(0, "v2", DataTypes.INT()));
- assertThat(parser.parseSchemaChange()).isEmpty();
- parser.setRawEvent(CANAL_JSON_DDL_ADD_EVENT);
- List<DataField> updatedDataFieldsAdd = parser.parseSchemaChange();
- assertThat(updatedDataFieldsAdd).isEqualTo(expectDataFields);
-
- expectDataFields.clear();
- expectDataFields.add(new DataField(0, "v1", DataTypes.VARCHAR(20)));
- parser.setRawEvent(CANAL_JSON_DDL_MODIFY_EVENT);
- List<DataField> updatedDataFieldsModify = parser.parseSchemaChange();
- assertThat(updatedDataFieldsModify).isEqualTo(expectDataFields);
- expectDataFields.clear();
-
- expectDataFields.add(new DataField(0, "v4", DataTypes.INT()));
- expectDataFields.add(new DataField(1, "v1", DataTypes.VARCHAR(30)));
-
- expectDataFields.add(new DataField(2, "v5", DataTypes.DOUBLE()));
- expectDataFields.add(new DataField(3, "v6", DataTypes.DECIMAL(5, 3)));
- expectDataFields.add(new DataField(4, "$% ^,& *(", DataTypes.VARCHAR(10)));
- expectDataFields.add(new DataField(5, "v2", DataTypes.BIGINT()));
-
- parser.setRawEvent(CANAL_JSON_DDL_MULTI_ADD_EVENT);
- List<DataField> updatedDataFieldsMulti = parser.parseSchemaChange();
- assertThat(updatedDataFieldsMulti).isEqualTo(expectDataFields);
- expectDataFields.clear();
- parser.setRawEvent(CANAL_JSON_DDL_CHANGE_EVENT);
- List<DataField> updatedDataFieldsChange = parser.parseSchemaChange();
- expectDataFields.add(new DataField(0, "cg", DataTypes.VARCHAR(20)));
- assertThat(updatedDataFieldsChange).isEqualTo(expectDataFields);
- }
-
- @Test
- public void testCanalJsonEventParserParseDebeziumJson() {
- boolean caseSensitive = true;
- EventParser<String> parser =
- new CanalJsonEventParser(caseSensitive, new TableNameConverter(caseSensitive));
- parser.setRawEvent(DEBEZIUM_JSON_EVENT);
- RuntimeException e =
- assertThrows(
- RuntimeException.class, parser::parseRecords, "Expecting RuntimeException");
- assertThat(e)
- .hasMessage(
- "CanalJsonEventParser only supports canal-json format,please make sure that your topic's format is accurate.");
- }
-
- @Test
- public void testCaseSensitive() {
- EventParser<String> parserCaseInsensitive =
- new CanalJsonEventParser(false, new TableNameConverter(false));
- EventParser<String> parserCaseSensitive =
- new CanalJsonEventParser(true, new TableNameConverter(true));
- parserCaseInsensitive.setRawEvent(CANAL_JSON_EVENT);
- List<DataField> dataFields = new ArrayList<>();
- dataFields.add(new DataField(0, "pt", DataTypes.INT()));
- dataFields.add(new DataField(1, "_id", DataTypes.INT()));
- dataFields.add(new DataField(2, "v1", DataTypes.VARCHAR(10)));
- dataFields.add(new DataField(3, "_geometrycollection", DataTypes.STRING()));
- dataFields.add(new DataField(4, "_set", DataTypes.ARRAY(DataTypes.STRING())));
- dataFields.add(new DataField(5, "_enum", DataTypes.STRING()));
- assertThat(parserCaseInsensitive.parseSchemaChange()).isEmpty();
- List<GenericRow> result =
- parserCaseInsensitive.parseRecords().stream()
- .map(record -> toGenericRow(record, dataFields).get())
- .collect(Collectors.toList());
- BinaryString[] binaryStrings =
- new BinaryString[] {BinaryString.fromString("a"), BinaryString.fromString("b")};
- GenericArray genericArray = new GenericArray(binaryStrings);
-
- List<GenericRow> expect =
- Collections.singletonList(
- GenericRow.of(
- 1,
- 1,
- BinaryString.fromString("one"),
- BinaryString.fromString(
- "{\"geometries\":[{\"type\":\"Point\",\"coordinates\":[10,10]},{\"type\":\"Point\",\"coordinates\":[30,30]},{\"type\":\"LineString\",\"coordinates\":[[15,15],[20,20]]}],\"type\":\"GeometryCollection\",\"srid\":0}"),
- genericArray,
- BinaryString.fromString("value1")));
- assertThat(result).isEqualTo(expect);
- parserCaseSensitive.setRawEvent(CANAL_JSON_EVENT);
- Optional<GenericRow> genericRow =
- toGenericRow(parserCaseSensitive.parseRecords().get(0), dataFields);
- assert !genericRow.isPresent();
- dataFields.remove(1);
- dataFields.add(1, new DataField(1, "_ID", DataTypes.INT()));
- List<GenericRow> resultCaseSensitive =
- parserCaseSensitive.parseRecords().stream()
- .map(record -> toGenericRow(record, dataFields).get())
- .collect(Collectors.toList());
- assertThat(resultCaseSensitive).isEqualTo(expect);
- }
-
- @Test
- public void testCanalJsonEventParserAndComputeColumn() {
- boolean caseSensitive = false;
- EventParser<String> parser =
- new CanalJsonEventParser(
- caseSensitive,
- new TableNameConverter(caseSensitive),
- Collections.singletonList(
- new ComputedColumn("v1", Expression.substring("v1", "1"))));
- parser.setRawEvent(CANAL_JSON_EVENT);
- List<DataField> dataFields = new ArrayList<>();
- dataFields.add(new DataField(0, "pt", DataTypes.INT()));
- dataFields.add(new DataField(1, "_id", DataTypes.INT()));
- dataFields.add(new DataField(2, "v1", DataTypes.VARCHAR(10)));
- dataFields.add(new DataField(3, "_geometrycollection", DataTypes.STRING()));
- dataFields.add(new DataField(4, "_set", DataTypes.ARRAY(DataTypes.STRING())));
- dataFields.add(new DataField(5, "_enum", DataTypes.STRING()));
- assertThat(parser.parseSchemaChange()).isEmpty();
- List<GenericRow> result =
- parser.parseRecords().stream()
- .map(record -> toGenericRow(record, dataFields).get())
- .collect(Collectors.toList());
- BinaryString[] binaryStrings =
- new BinaryString[] {BinaryString.fromString("a"), BinaryString.fromString("b")};
- GenericArray genericArray = new GenericArray(binaryStrings);
-
- List<GenericRow> expect =
- Collections.singletonList(
- GenericRow.of(
- 1,
- 1,
- BinaryString.fromString("ne"),
- BinaryString.fromString(
- "{\"geometries\":[{\"type\":\"Point\",\"coordinates\":[10,10]},{\"type\":\"Point\",\"coordinates\":[30,30]},{\"type\":\"LineString\",\"coordinates\":[[15,15],[20,20]]}],\"type\":\"GeometryCollection\",\"srid\":0}"),
- genericArray,
- BinaryString.fromString("value1")));
- assertThat(result).isEqualTo(expect);
- }
-}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
index 7288138bc..e39a74d5c 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
@@ -46,11 +46,6 @@ public class TestCdcEventParser implements EventParser<TestCdcEvent> {
return ObjectUtils.coalesce(raw.updatedDataFields(), Collections.emptyList());
}
- @Override
- public String parseDatabaseName() {
- return null;
- }
-
@Override
public List<CdcRecord> parseRecords() {
return ObjectUtils.coalesce(raw.records(), Collections.emptyList());
diff --git a/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/table/schemaevolutionmissingddl/canal-data-1.txt b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/table/schemaevolutionmissingddl/canal-data-1.txt
new file mode 100644
index 000000000..8b6d5d6c4
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/table/schemaevolutionmissingddl/canal-data-1.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":[{"pt":"1","_id":"1","v1":"one"}],"database":"paimon_sync_table","es":1683006706000,"id":92,"isDdl":false,"mysqlType":{"pt":"INT","_id":"INT","v1":"VARCHAR(10)"},"old":null,"pkNames":["_id"],"sql":"","sqlType":{"pt":4,"_id":4,"v1":12},"table":"schema_evolution_1","ts":1683006706728,"type":"INSERT"}
+{"data":[{"pt":"1","_id":"2","v1":"two"},{"pt":"2","_id":"4","v1":"four"}],"database":"paimon_sync_table","es":1683006724000,"id":94,"isDdl":false,"mysqlType":{"pt":"INT","_id":"INT","v1":"VARCHAR(10)"},"old":null,"pkNames":["_id"],"sql":"","sqlType":{"pt":4,"_id":4,"v1":12},"table":"schema_evolution_2","ts":1683006724404,"type":"INSERT"}
diff --git a/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/table/schemaevolutionmissingddl/canal-data-2.txt b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/table/schemaevolutionmissingddl/canal-data-2.txt
new file mode 100644
index 000000000..842d5a37d
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/table/schemaevolutionmissingddl/canal-data-2.txt
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":[{"pt":"2","_id":"3","v1":"three","v2":"30"},{"pt":"1","_id":"5","v1":"five","v2":"50"}],"database":"paimon_sync_table","es":1683008289000,"id":14,"isDdl":false,"mysqlType":{"pt":"int","_id":"int","v1":"varchar(10)","v2":"INT"},"old":null,"pkNames":["_id"],"sql":"","sqlType":{"pt":4,"_id":4,"v1":12,"v2":4},"table":"schema_evolution_1","ts":1683008289719,"type":"INSERT"}
+{"data":[{"pt":"1","_id":"6","v1":"six","v2":"60"}],"database":"paimon_sync_table","es":1683008290000,"id":16,"isDdl":false,"mysqlType":{"pt":"int","_id":"int","v1":"varchar(10)","v2":"INT"},"old":null,"pkNames":["_id"],"sql":"","sqlType":{"pt":4,"_id":4,"v1":12,"v2":4},"table":"schema_evolution_2","ts":1683008290391,"type":"INSERT"}
+{"data":[{"pt":"1","_id":"2","v1":"second","v2":null}],"database":"paimon_sync_table","es":1683008290000,"id":18,"isDdl":false,"mysqlType":{"pt":"int","_id":"int","v1":"varchar(10)","v2":"INT"},"old":[{"v1":"two"}],"pkNames":["_id"],"sql":"","sqlType":{"pt":4,"_id":4,"v1":12,"v2":4},"table":"schema_evolution_2","ts":1683008290626,"type":"UPDATE"}
diff --git a/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/table/schemaevolutionmissingddl/canal-data-3.txt b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/table/schemaevolutionmissingddl/canal-data-3.txt
new file mode 100644
index 000000000..fc9547843
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/table/schemaevolutionmissingddl/canal-data-3.txt
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":[{"pt":"2","_id":"7","v1":"seven","v2":"70000000000"}],"database":"paimon_sync_table","es":1683168079000,"id":41,"isDdl":false,"mysqlType":{"pt":"INT","_id":"INT","v1":"VARCHAR(10)","v2":"BIGINT"},"old":null,"pkNames":["_id"],"sql":"","sqlType":{"pt":4,"_id":4,"v1":12,"v2":-5},"table":"schema_evolution_1","ts":1683168079071,"type":"INSERT"}
+{"data":[{"pt":"1","_id":"5","v1":"five","v2":"50"}],"database":"paimon_sync_table","es":1683168079000,"id":42,"isDdl":false,"mysqlType":{"pt":"INT","_id":"INT","v1":"VARCHAR(10)","v2":"BIGINT"},"old":null,"pkNames":["_id"],"sql":"","sqlType":{"pt":4,"_id":4,"v1":12,"v2":-5},"table":"schema_evolution_1","ts":1683168079391,"type":"DELETE"}
+{"data":[{"pt":"2","_id":"3","v1":"three","v2":"30000000000"}],"database":"paimon_sync_table","es":1683168079000,"id":43,"isDdl":false,"mysqlType":{"pt":"INT","_id":"INT","v1":"VARCHAR(10)","v2":"BIGINT"},"old":[{"v2":"30"}],"pkNames":["_id"],"sql":"","sqlType":{"pt":4,"_id":4,"v1":12,"v2":-5},"table":"schema_evolution_1","ts":1683168079507,"type":"UPDATE"}
+{"data":[{"pt":"2","_id":"8","v1":"eight","v2":"80000000000"}],"database":"paimon_sync_table","es":1683168080000,"id":45,"isDdl":false,"mysqlType":{"pt":"INT","_id":"INT","v1":"VARCHAR(10)","v2":"BIGINT"},"old":null,"pkNames":["_id"],"sql":"","sqlType":{"pt":4,"_id":4,"v1":12,"v2":-5},"table":"schema_evolution_2","ts":1683168079943,"type":"INSERT"}
diff --git a/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/table/schemaevolutionmissingddl/canal-data-4.txt b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/table/schemaevolutionmissingddl/canal-data-4.txt
new file mode 100644
index 000000000..45e4137e5
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/table/schemaevolutionmissingddl/canal-data-4.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":[{"pt":"1","_id":"9","v1":"nine","v2":"90000000000","v3":"99999.999","v4":"nine.bin","v5":"9.9"}],"database":"paimon_sync_table","es":1683168155000,"id":55,"isDdl":false,"mysqlType":{"pt":"INT","_id":"INT","v1":"VARCHAR(20)","v2":"BIGINT","v3":"NUMERIC(8,3)","v4":"VARBINARY(10)","v5":"FLOAT"},"old":null,"pkNames":["_id"],"sql":"","sqlType":{"pt":4,"_id":4,"v1":12,"v2":-5,"v3":3,"v4":2004,"v5":7},"table":"schema_evolution_1","ts":1683168155270,"type":"INSERT"}
+{"data":[{"pt":"2","_id":"8","v1":"very long string","v2":"80000000000","v3":null,"v4":null,"v5":null}],"database":"paimon_sync_table","es":1683168156000,"id":60,"isDdl":false,"mysqlType":{"pt":"INT","_id":"INT","v1":"VARCHAR(20)","v2":"BIGINT","v3":"NUMERIC(8,3)","v4":"VARBINARY(10)","v5":"FLOAT"},"old":[{"v1":"eight"}],"pkNames":["_id"],"sql":"","sqlType":{"pt":4,"_id":4,"v1":12,"v2":-5,"v3":3,"v4":-3,"v5":7},"table":"schema_evolution_2","ts":1683168156456,"type":"UPDATE"}
diff --git a/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/table/schemaevolutionmissingddl/canal-data-5.txt b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/table/schemaevolutionmissingddl/canal-data-5.txt
new file mode 100644
index 000000000..bb262c876
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/test/resources/kafka.canal/table/schemaevolutionmissingddl/canal-data-5.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":[{"pt":"1","_id":"9","v1":"nine","v2":"90000000000","v3":"99999.999","v4":"nine.bin.long","v5":"9.00000000009"}],"database":"paimon_sync_table","es":1683357620000,"id":4710,"isDdl":false,"mysqlType":{"pt":"INT","_id":"INT","v1":"VARCHAR(20)","v2":"BIGINT","v3":"NUMERIC(8,3)","v4":"VARBINARY(20)","v5":"DOUBLE"},"old":[{"v4":"nine.bin","v5":"9.899999618530273"}],"pkNames":["_id"],"sql":"","sqlType":{"pt":4,"_id":4,"v1":12,"v2":-5,"v3":3,"v4":2004,"v5":8},"table":"schema_evolution_1 [...]
+{"data":[{"pt":"2","_id":"4","v1":"four","v2":null,"v3":null,"v4":"four.bin.long","v5":"4.00000000004"}],"database":"paimon_sync_table","es":1683357620000,"id":4710,"isDdl":false,"mysqlType":{"pt":"INT","_id":"INT","v1":"VARCHAR(20)","v2":"BIGINT","v3":"NUMERIC(8,3)","v4":"VARBINARY(20)","v5":"DOUBLE"},"old":[{"v4":null,"v5":null}],"pkNames":["_id"],"sql":"","sqlType":{"pt":4,"_id":4,"v1":12,"v2":-5,"v3":3,"v4":2004,"v5":8},"table":"schema_evolution_2","ts":1683357621014,"type":"UPDATE"}