You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/10/21 09:04:58 UTC
[inlong] branch master updated: [INLONG-6174][Sort] MySql connector support meta data with debezium format (#6210)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new eb48b847f [INLONG-6174][Sort] MySql connector support meta data with debezium format (#6210)
eb48b847f is described below
commit eb48b847f28d78e2d349730143912e9fedeca49d
Author: Schnapps <zp...@connect.ust.hk>
AuthorDate: Fri Oct 21 17:04:51 2022 +0800
[INLONG-6174][Sort] MySql connector support meta data with debezium format (#6210)
---
.../org/apache/inlong/common/enums/MetaField.java | 22 ++++
.../org/apache/inlong/sort/protocol/Metadata.java | 4 +
.../protocol/node/extract/MySqlExtractNode.java | 13 ++-
.../sort/protocol/node/load/KafkaLoadNode.java | 10 +-
.../node/extract/MySqlExtractNodeTest.java | 9 +-
.../sort/protocol/node/load/KafkaLoadNodeTest.java | 3 +-
.../cdc/mysql/table/MySqlReadableMetadata.java | 124 ++++++++++++++++-----
.../apache/inlong/sort/parser/AllMigrateTest.java | 2 +-
inlong-sort/sort-formats/format-json/pom.xml | 4 +
.../sort/formats/json/debezium/DebeziumJson.java | 48 ++++++++
10 files changed, 199 insertions(+), 40 deletions(-)
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java
index 8563c2234..f88da9c4d 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java
@@ -61,14 +61,36 @@ public enum MetaField {
/**
* Represents a canal json of a record in database (in string format)
+ * @deprecated please use DATA_CANAL \ DATA_DEBEZIUM
*/
DATA,
+ /**
+ * Represents a canal json of a record in database (in string format)
+ */
+ DATA_CANAL,
+
+ /**
+ * Represents a debezium json of a record in database (in string format)
+ */
+ DATA_DEBEZIUM,
+
/**
* Represents a canal json of a record in database (in bytes format)
+ * @deprecated please use DATA_BYTES_DEBEZIUM \ DATA_CANAL_BYTES
*/
DATA_BYTES,
+ /**
+ * Represents a debezium json of a record in database (in bytes format)
+ */
+ DATA_BYTES_DEBEZIUM,
+
+ /**
+ * Represents a canal json of a record in database (in bytes format)
+ */
+ DATA_BYTES_CANAL,
+
/**
* The value of the field before update. Currently, it is used for MySQL database.
*/
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/Metadata.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/Metadata.java
index 496b852a2..f4822fb80 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/Metadata.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/Metadata.java
@@ -78,7 +78,9 @@ public interface Metadata {
case TABLE_NAME:
case DATABASE_NAME:
case OP_TYPE:
+ case DATA_CANAL:
case DATA:
+ case DATA_DEBEZIUM:
case COLLECTION_NAME:
case SCHEMA_NAME:
metadataType = "STRING";
@@ -106,6 +108,8 @@ public interface Metadata {
metadataType = "ARRAY<MAP<STRING, STRING>>";
break;
case DATA_BYTES:
+ case DATA_BYTES_DEBEZIUM:
+ case DATA_BYTES_CANAL:
metadataType = "BYTES";
break;
default:
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/MySqlExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/MySqlExtractNode.java
index e561610b8..f674ef8c3 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/MySqlExtractNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/MySqlExtractNode.java
@@ -280,7 +280,13 @@ public class MySqlExtractNode extends ExtractNode implements Metadata, InlongMet
break;
case DATA:
case DATA_BYTES:
- metadataKey = "meta.data";
+ case DATA_CANAL:
+ case DATA_BYTES_CANAL:
+ metadataKey = "meta.data_canal";
+ break;
+ case DATA_DEBEZIUM:
+ case DATA_BYTES_DEBEZIUM:
+ metadataKey = "meta.data_debezium";
break;
case IS_DDL:
metadataKey = "meta.is_ddl";
@@ -317,9 +323,10 @@ public class MySqlExtractNode extends ExtractNode implements Metadata, InlongMet
@Override
public Set<MetaField> supportedMetaFields() {
- return EnumSet.of(MetaField.PROCESS_TIME, MetaField.TABLE_NAME, MetaField.DATA,
+ return EnumSet.of(MetaField.PROCESS_TIME, MetaField.TABLE_NAME, MetaField.DATA_CANAL,
MetaField.DATABASE_NAME, MetaField.OP_TYPE, MetaField.OP_TS, MetaField.IS_DDL,
MetaField.TS, MetaField.SQL_TYPE, MetaField.MYSQL_TYPE, MetaField.PK_NAMES,
- MetaField.BATCH_ID, MetaField.UPDATE_BEFORE, MetaField.DATA_BYTES);
+ MetaField.BATCH_ID, MetaField.UPDATE_BEFORE, MetaField.DATA_BYTES_DEBEZIUM,
+ MetaField.DATA_DEBEZIUM, MetaField.DATA_BYTES_CANAL, MetaField.DATA, MetaField.DATA_BYTES);
}
}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java
index 13b741217..be326b478 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java
@@ -227,7 +227,8 @@ public class KafkaLoadNode extends LoadNode implements InlongMetric, Metadata, S
metadataKey = "value.op-type";
break;
case DATA:
- metadataKey = "value.data";
+ case DATA_CANAL:
+ metadataKey = "value.data_canal";
break;
case IS_DDL:
metadataKey = "value.is-ddl";
@@ -255,8 +256,9 @@ public class KafkaLoadNode extends LoadNode implements InlongMetric, Metadata, S
@Override
public Set<MetaField> supportedMetaFields() {
- return EnumSet.of(MetaField.PROCESS_TIME, MetaField.TABLE_NAME, MetaField.OP_TYPE, MetaField.DATABASE_NAME,
- MetaField.SQL_TYPE, MetaField.PK_NAMES, MetaField.TS, MetaField.OP_TS, MetaField.IS_DDL,
- MetaField.MYSQL_TYPE, MetaField.BATCH_ID, MetaField.UPDATE_BEFORE, MetaField.DATA);
+ return EnumSet.of(MetaField.PROCESS_TIME, MetaField.TABLE_NAME, MetaField.OP_TYPE,
+ MetaField.DATABASE_NAME, MetaField.SQL_TYPE, MetaField.PK_NAMES, MetaField.TS,
+ MetaField.OP_TS, MetaField.IS_DDL, MetaField.MYSQL_TYPE, MetaField.BATCH_ID,
+ MetaField.UPDATE_BEFORE, MetaField.DATA_CANAL, MetaField.DATA);
}
}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/MySqlExtractNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/MySqlExtractNodeTest.java
index 9a1248642..19811532b 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/MySqlExtractNodeTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/MySqlExtractNodeTest.java
@@ -49,7 +49,7 @@ public class MySqlExtractNodeTest extends SerializeBaseTest<MySqlExtractNode> {
Map<MetaField, String> formatMap = new HashMap<>();
formatMap.put(MetaField.PROCESS_TIME, "AS PROCTIME()");
formatMap.put(MetaField.TABLE_NAME, "STRING METADATA FROM 'meta.table_name' VIRTUAL");
- formatMap.put(MetaField.DATA, "STRING METADATA FROM 'meta.data' VIRTUAL");
+ formatMap.put(MetaField.DATA_CANAL, "STRING METADATA FROM 'meta.data_canal' VIRTUAL");
formatMap.put(MetaField.DATABASE_NAME, "STRING METADATA FROM 'meta.database_name' VIRTUAL");
formatMap.put(MetaField.OP_TYPE, "STRING METADATA FROM 'meta.op_type' VIRTUAL");
formatMap.put(MetaField.OP_TS, "TIMESTAMP_LTZ(3) METADATA FROM 'meta.op_ts' VIRTUAL");
@@ -60,7 +60,12 @@ public class MySqlExtractNodeTest extends SerializeBaseTest<MySqlExtractNode> {
formatMap.put(MetaField.PK_NAMES, "ARRAY<STRING> METADATA FROM 'meta.pk_names' VIRTUAL");
formatMap.put(MetaField.BATCH_ID, "BIGINT METADATA FROM 'meta.batch_id' VIRTUAL");
formatMap.put(MetaField.UPDATE_BEFORE, "ARRAY<MAP<STRING, STRING>> METADATA FROM 'meta.update_before' VIRTUAL");
- formatMap.put(MetaField.DATA_BYTES, "BYTES METADATA FROM 'meta.data' VIRTUAL");
+ formatMap.put(MetaField.DATA_BYTES_DEBEZIUM, "BYTES METADATA FROM 'meta.data_debezium' VIRTUAL");
+ formatMap.put(MetaField.DATA_DEBEZIUM, "STRING METADATA FROM 'meta.data_debezium' VIRTUAL");
+ formatMap.put(MetaField.DATA_BYTES_CANAL, "BYTES METADATA FROM 'meta.data_canal' VIRTUAL");
+ formatMap.put(MetaField.DATA, "STRING METADATA FROM 'meta.data_canal' VIRTUAL");
+ formatMap.put(MetaField.DATA_BYTES, "BYTES METADATA FROM 'meta.data_canal' VIRTUAL");
+
MySqlExtractNode node = getTestObject();
boolean formatEquals = true;
for (MetaField metaField : node.supportedMetaFields()) {
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNodeTest.java
index 52588d055..a9f495c28 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNodeTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNodeTest.java
@@ -50,7 +50,8 @@ public class KafkaLoadNodeTest extends SerializeBaseTest<KafkaLoadNode> {
public void testMetaFields() {
Map<MetaField, String> formatMap = new HashMap<>();
formatMap.put(MetaField.PROCESS_TIME, "AS PROCTIME()");
- formatMap.put(MetaField.DATA, "STRING METADATA FROM 'value.data'");
+ formatMap.put(MetaField.DATA_CANAL, "STRING METADATA FROM 'value.data_canal'");
+ formatMap.put(MetaField.DATA, "STRING METADATA FROM 'value.data_canal'");
formatMap.put(MetaField.TABLE_NAME, "STRING METADATA FROM 'value.table'");
formatMap.put(MetaField.DATABASE_NAME, "STRING METADATA FROM 'value.database'");
formatMap.put(MetaField.OP_TYPE, "STRING METADATA FROM 'value.op-type'");
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java
index 5f79b4980..fcfb636ac 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java
@@ -23,6 +23,7 @@ import io.debezium.data.Envelope;
import io.debezium.data.Envelope.FieldName;
import io.debezium.relational.Table;
import io.debezium.relational.history.TableChanges;
+import io.debezium.relational.history.TableChanges.TableChange;
import java.util.LinkedHashMap;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.table.api.DataTypes;
@@ -35,6 +36,8 @@ import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;
import org.apache.inlong.sort.cdc.debezium.table.MetadataConverter;
import org.apache.inlong.sort.formats.json.canal.CanalJson;
+import org.apache.inlong.sort.formats.json.debezium.DebeziumJson;
+import org.apache.inlong.sort.formats.json.debezium.DebeziumJson.Source;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
@@ -97,8 +100,27 @@ public enum MySqlReadableMetadata {
}
}),
+ DATA_DEFAULT(
+ "meta.data",
+ DataTypes.STRING(),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(SourceRecord record) {
+ return null;
+ }
+
+ @Override
+ public Object read(SourceRecord record,
+ @Nullable TableChanges.TableChange tableSchema, RowData rowData) {
+ // construct canal json
+ return getCanalData(record, (GenericRowData) rowData, tableSchema);
+ }
+ }),
+
DATA(
- "meta.data",
+ "meta.data_canal",
DataTypes.STRING(),
new MetadataConverter() {
private static final long serialVersionUID = 1L;
@@ -112,37 +134,49 @@ public enum MySqlReadableMetadata {
public Object read(SourceRecord record,
@Nullable TableChanges.TableChange tableSchema, RowData rowData) {
// construct canal json
- Struct messageStruct = (Struct) record.value();
- Struct sourceStruct = messageStruct.getStruct(FieldName.SOURCE);
- // tableName
- String tableName = getMetaData(record, AbstractSourceInfo.TABLE_NAME_KEY);
- // databaseName
- String databaseName = getMetaData(record, AbstractSourceInfo.DATABASE_NAME_KEY);
- // opTs
- long opTs = (Long) sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY);
- // ts
- long ts = (Long) messageStruct.get(FieldName.TIMESTAMP);
- // actual data
- GenericRowData data = (GenericRowData) rowData;
- Map<String, Object> field = (Map<String, Object>) data.getField(0);
- List<Map<String, Object>> dataList = new ArrayList<>();
- dataList.add(field);
-
- CanalJson canalJson = CanalJson.builder()
- .data(dataList).database(databaseName)
- .sql("").es(opTs).isDdl(false).pkNames(getPkNames(tableSchema))
- .mysqlType(getMysqlType(tableSchema)).table(tableName).ts(ts)
- .type(getOpType(record)).sqlType(getSqlType(tableSchema)).build();
-
- try {
- ObjectMapper objectMapper = new ObjectMapper();
- return StringData.fromString(objectMapper.writeValueAsString(canalJson));
- } catch (Exception e) {
- throw new IllegalStateException("exception occurs when get meta data", e);
- }
+ return getCanalData(record, (GenericRowData) rowData, tableSchema);
}
}),
+ DATA_DEBEZIUM(
+ "meta.data_debezium",
+ DataTypes.STRING(),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(SourceRecord record) {
+ return null;
+ }
+
+ @Override
+ public Object read(SourceRecord record,
+ @Nullable TableChanges.TableChange tableSchema, RowData rowData) {
+ // construct debezium json
+ Struct messageStruct = (Struct) record.value();
+ Struct sourceStruct = messageStruct.getStruct(FieldName.SOURCE);
+ GenericRowData data = (GenericRowData) rowData;
+ Map<String, Object> field = (Map<String, Object>) data.getField(0);
+
+ Source source = Source.builder().db(getMetaData(record, AbstractSourceInfo.DATABASE_NAME_KEY))
+ .table(getMetaData(record, AbstractSourceInfo.TABLE_NAME_KEY))
+ .name(sourceStruct.getString(AbstractSourceInfo.SERVER_NAME_KEY))
+ .sqlType(getSqlType(tableSchema))
+ .pkNames(getPkNames(tableSchema))
+ .mysqlType(getMysqlType(tableSchema))
+ .build();
+ DebeziumJson debeziumJson = DebeziumJson.builder().after(field).source(source)
+ .tsMs(sourceStruct.getInt64(AbstractSourceInfo.TIMESTAMP_KEY)).op(getOpType(record))
+ .tableChange(tableSchema).build();
+
+ try {
+ return StringData.fromString(OBJECT_MAPPER.writeValueAsString(debeziumJson));
+ } catch (Exception e) {
+ throw new IllegalStateException("exception occurs when get meta data", e);
+ }
+ }
+ }),
+
/**
* Name of the table that contain the row. .
*/
@@ -379,9 +413,41 @@ public enum MySqlReadableMetadata {
}
});
+ private static StringData getCanalData(SourceRecord record, GenericRowData rowData,
+ TableChange tableSchema) {
+ Struct messageStruct = (Struct) record.value();
+ Struct sourceStruct = messageStruct.getStruct(FieldName.SOURCE);
+ // tableName
+ String tableName = getMetaData(record, AbstractSourceInfo.TABLE_NAME_KEY);
+ // databaseName
+ String databaseName = getMetaData(record, AbstractSourceInfo.DATABASE_NAME_KEY);
+ // opTs
+ long opTs = (Long) sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY);
+ // ts
+ long ts = (Long) messageStruct.get(FieldName.TIMESTAMP);
+ // actual data
+ GenericRowData data = rowData;
+ Map<String, Object> field = (Map<String, Object>) data.getField(0);
+ List<Map<String, Object>> dataList = new ArrayList<>();
+ dataList.add(field);
+
+ CanalJson canalJson = CanalJson.builder()
+ .data(dataList).database(databaseName)
+ .sql("").es(opTs).isDdl(false).pkNames(getPkNames(tableSchema))
+ .mysqlType(getMysqlType(tableSchema)).table(tableName).ts(ts)
+ .type(getOpType(record)).sqlType(getSqlType(tableSchema)).build();
+
+ try {
+ return StringData.fromString(OBJECT_MAPPER.writeValueAsString(canalJson));
+ } catch (Exception e) {
+ throw new IllegalStateException("exception occurs when get meta data", e);
+ }
+ }
+
private final String key;
private final DataType dataType;
private final MetadataConverter converter;
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
MySqlReadableMetadata(String key, DataType dataType, MetadataConverter converter) {
this.key = key;
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
index 2e4f45e6d..7b80c22ea 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
@@ -68,7 +68,7 @@ public class AllMigrateTest {
private MySqlExtractNode buildAllMigrateExtractNodeWithBytesFormat() {
List<FieldInfo> fields = Collections.singletonList(
- new MetaFieldInfo("data", MetaField.DATA_BYTES));
+ new MetaFieldInfo("data", MetaField.DATA_BYTES_DEBEZIUM));
Map<String, String> option = new HashMap<>();
option.put("append-mode", "true");
option.put("migrate-all", "true");
diff --git a/inlong-sort/sort-formats/format-json/pom.xml b/inlong-sort/sort-formats/format-json/pom.xml
index 2c3da00ab..c317bfebb 100644
--- a/inlong-sort/sort-formats/format-json/pom.xml
+++ b/inlong-sort/sort-formats/format-json/pom.xml
@@ -75,6 +75,10 @@
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.debezium</groupId>
+ <artifactId>debezium-core</artifactId>
+ </dependency>
</dependencies>
diff --git a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJson.java b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJson.java
new file mode 100644
index 000000000..51d60069d
--- /dev/null
+++ b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/debezium/DebeziumJson.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.json.debezium;
+
+import io.debezium.relational.history.TableChanges;
+import java.util.List;
+import java.util.Map;
+import lombok.Builder;
+import lombok.Data;
+
+@Data
+@Builder
+public class DebeziumJson {
+
+ private Map<String, String> before;
+ private Map<String, Object> after;
+ private Source source;
+ private TableChanges.TableChange tableChange;
+ private long tsMs;
+ private String op;
+
+ @Builder
+ @Data
+ public static class Source {
+ private String name;
+ private String db;
+ private String table;
+ private List<String> pkNames;
+ private Map<String, Integer> sqlType;
+ private Map<String, String> mysqlType;
+ }
+
+}