You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/11/06 00:41:31 UTC
[inlong] branch master updated: [INLONG-6370][Sort] The op type in debezium format should be u or update (#6408)
This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 1fbf04550 [INLONG-6370][Sort] The op type in debezium format should be u or update (#6408)
1fbf04550 is described below
commit 1fbf0455046f8d066337893225862245b5c1adfe
Author: Schnapps <zp...@connect.ust.hk>
AuthorDate: Sun Nov 6 08:41:25 2022 +0800
[INLONG-6370][Sort] The op type in debezium format should be u or update (#6408)
---
.../sort/cdc/mysql/table/MySqlReadableMetadata.java | 21 +++++++++++++++++----
1 file changed, 17 insertions(+), 4 deletions(-)
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java
index fcfb636ac..bff2cc284 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java
@@ -166,7 +166,7 @@ public enum MySqlReadableMetadata {
.mysqlType(getMysqlType(tableSchema))
.build();
DebeziumJson debeziumJson = DebeziumJson.builder().after(field).source(source)
- .tsMs(sourceStruct.getInt64(AbstractSourceInfo.TIMESTAMP_KEY)).op(getOpType(record))
+ .tsMs(sourceStruct.getInt64(AbstractSourceInfo.TIMESTAMP_KEY)).op(getDebeziumOpType(record))
.tableChange(tableSchema).build();
try {
@@ -237,7 +237,7 @@ public enum MySqlReadableMetadata {
@Override
public Object read(SourceRecord record) {
- return StringData.fromString(getOpType(record));
+ return StringData.fromString(getCanalOpType(record));
}
}),
@@ -435,7 +435,7 @@ public enum MySqlReadableMetadata {
.data(dataList).database(databaseName)
.sql("").es(opTs).isDdl(false).pkNames(getPkNames(tableSchema))
.mysqlType(getMysqlType(tableSchema)).table(tableName).ts(ts)
- .type(getOpType(record)).sqlType(getSqlType(tableSchema)).build();
+ .type(getCanalOpType(record)).sqlType(getSqlType(tableSchema)).build();
try {
return StringData.fromString(OBJECT_MAPPER.writeValueAsString(canalJson));
@@ -455,7 +455,7 @@ public enum MySqlReadableMetadata {
this.converter = converter;
}
- private static String getOpType(SourceRecord record) {
+ private static String getCanalOpType(SourceRecord record) {
String opType;
final Envelope.Operation op = Envelope.operationFor(record);
if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
@@ -468,6 +468,19 @@ public enum MySqlReadableMetadata {
return opType;
}
+ private static String getDebeziumOpType(SourceRecord record) {
+ String opType;
+ final Envelope.Operation op = Envelope.operationFor(record);
+ if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
+ opType = "c";
+ } else if (op == Envelope.Operation.DELETE) {
+ opType = "d";
+ } else {
+ opType = "u";
+ }
+ return opType;
+ }
+
private static List<String> getPkNames(@Nullable TableChanges.TableChange tableSchema) {
if (tableSchema == null) {
return null;