You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/11/06 00:41:31 UTC

[inlong] branch master updated: [INLONG-6370][Sort] The op type in debezium format should be u or update (#6408)

This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 1fbf04550 [INLONG-6370][Sort] The op type in debezium format should be u or update (#6408)
1fbf04550 is described below

commit 1fbf0455046f8d066337893225862245b5c1adfe
Author: Schnapps <zp...@connect.ust.hk>
AuthorDate: Sun Nov 6 08:41:25 2022 +0800

    [INLONG-6370][Sort] The op type in debezium format should be u or update (#6408)
---
 .../sort/cdc/mysql/table/MySqlReadableMetadata.java | 21 +++++++++++++++++----
 1 file changed, 17 insertions(+), 4 deletions(-)

diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java
index fcfb636ac..bff2cc284 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java
@@ -166,7 +166,7 @@ public enum MySqlReadableMetadata {
                     .mysqlType(getMysqlType(tableSchema))
                     .build();
                 DebeziumJson debeziumJson = DebeziumJson.builder().after(field).source(source)
-                        .tsMs(sourceStruct.getInt64(AbstractSourceInfo.TIMESTAMP_KEY)).op(getOpType(record))
+                        .tsMs(sourceStruct.getInt64(AbstractSourceInfo.TIMESTAMP_KEY)).op(getDebeziumOpType(record))
                     .tableChange(tableSchema).build();
 
                 try {
@@ -237,7 +237,7 @@ public enum MySqlReadableMetadata {
 
                 @Override
                 public Object read(SourceRecord record) {
-                    return StringData.fromString(getOpType(record));
+                    return StringData.fromString(getCanalOpType(record));
                 }
             }),
 
@@ -435,7 +435,7 @@ public enum MySqlReadableMetadata {
             .data(dataList).database(databaseName)
             .sql("").es(opTs).isDdl(false).pkNames(getPkNames(tableSchema))
             .mysqlType(getMysqlType(tableSchema)).table(tableName).ts(ts)
-            .type(getOpType(record)).sqlType(getSqlType(tableSchema)).build();
+            .type(getCanalOpType(record)).sqlType(getSqlType(tableSchema)).build();
 
         try {
             return StringData.fromString(OBJECT_MAPPER.writeValueAsString(canalJson));
@@ -455,7 +455,7 @@ public enum MySqlReadableMetadata {
         this.converter = converter;
     }
 
-    private static String getOpType(SourceRecord record) {
+    private static String getCanalOpType(SourceRecord record) {
         String opType;
         final Envelope.Operation op = Envelope.operationFor(record);
         if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
@@ -468,6 +468,19 @@ public enum MySqlReadableMetadata {
         return opType;
     }
 
+    private static String getDebeziumOpType(SourceRecord record) {
+        String opType;
+        final Envelope.Operation op = Envelope.operationFor(record);
+        if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
+            opType = "c";
+        } else if (op == Envelope.Operation.DELETE) {
+            opType = "d";
+        } else {
+            opType = "u";
+        }
+        return opType;
+    }
+
     private static List<String> getPkNames(@Nullable TableChanges.TableChange tableSchema) {
         if (tableSchema == null) {
             return null;